/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2005 * Sleepycat Software. All rights reserved. * * $Id: rep_util.c,v 12.42 2005/10/27 01:26:02 mjc Exp $ */ #include "db_config.h" #ifndef NO_SYSTEM_INCLUDES #if TIME_WITH_SYS_TIME #include #include #else #if HAVE_SYS_TIME_H #include #else #include #endif #endif #include #include #endif #include "db_int.h" #include "dbinc/log.h" #include "dbinc/txn.h" #ifdef REP_DIAGNOSTIC #include "dbinc/db_page.h" #include "dbinc/fop.h" #include "dbinc/btree.h" #include "dbinc/hash.h" #include "dbinc/qam.h" #endif /* * rep_util.c: * Miscellaneous replication-related utility functions, including * those called by other subsystems. */ #define TIMESTAMP_CHECK(dbenv, ts, renv) do { \ if (renv->op_timestamp != 0 && \ renv->op_timestamp + DB_REGENV_TIMEOUT < ts) { \ REP_SYSTEM_LOCK(dbenv); \ F_CLR(renv, DB_REGENV_REPLOCKED); \ renv->op_timestamp = 0; \ REP_SYSTEM_UNLOCK(dbenv); \ } \ } while (0) #ifdef REP_DIAGNOSTIC static void __rep_print_logmsg __P((DB_ENV *, const DBT *, DB_LSN *)); #endif /* * __rep_bulk_message -- * This is a wrapper for putting a record into a bulk buffer. Since * we have different bulk buffers, the caller must hand us the information * we need to put the record into the correct buffer. All bulk buffers * are protected by the REP->mtx_clientdb. * * PUBLIC: int __rep_bulk_message __P((DB_ENV *, REP_BULK *, REP_THROTTLE *, * PUBLIC: DB_LSN *, const DBT *, u_int32_t)); */ int __rep_bulk_message(dbenv, bulk, repth, lsn, dbt, flags) DB_ENV *dbenv; REP_BULK *bulk; REP_THROTTLE *repth; DB_LSN *lsn; const DBT *dbt; u_int32_t flags; { DB_REP *db_rep; REP *rep; int ret; u_int32_t recsize, typemore; u_int8_t *p; #ifdef DIAGNOSTIC DB_MSGBUF mb; #endif db_rep = dbenv->rep_handle; rep = db_rep->region; ret = 0; /* * Figure out the total number of bytes needed for this record. */ recsize = dbt->size + sizeof(DB_LSN) + sizeof(dbt->size); /* * If *this* buffer is actively being transmitted, wait until * we can use it. */ MUTEX_LOCK(dbenv, rep->mtx_clientdb); while (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) { MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); __os_sleep(dbenv, 1, 0); MUTEX_LOCK(dbenv, rep->mtx_clientdb); } /* * If the record is bigger than the buffer entirely, send the * current buffer and then return DB_REP_BULKOVF so that this * record is sent as a singleton. Do we have enough info to * do that here? XXX */ if (recsize > bulk->len) { RPRINT(dbenv, rep, (dbenv, &mb, "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x", recsize, recsize, bulk->len)); rep->stat.st_bulk_overflows++; (void)__rep_send_bulk(dbenv, bulk, flags); /* * XXX __rep_send_message... */ MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); return (DB_REP_BULKOVF); } /* * If this record doesn't fit, send the current buffer. * Sending the buffer will reset the offset, but we will * drop the mutex while sending so we need to keep checking * if we're racing. */ while (recsize + *(bulk->offp) > bulk->len) { RPRINT(dbenv, rep, (dbenv, &mb, "bulk_msg: Record %lu (%#lx) doesn't fit. Send %lu (%#lx) now.", (u_long)recsize, (u_long)recsize, (u_long)bulk->len, (u_long)bulk->len)); rep->stat.st_bulk_fills++; if ((ret = __rep_send_bulk(dbenv, bulk, flags)) != 0) break; } /* * If we're using throttling, see if we are at the throttling * limit before we do any more work here, by checking if the * call to rep_send_throttle changed the repth->type to the * *_MORE message type. If the throttling code hits the limit * then we're done here. */ if (bulk->type == REP_BULK_LOG) typemore = REP_LOG_MORE; else typemore = REP_PAGE_MORE; if (repth != NULL && (ret = __rep_send_throttle(dbenv, bulk->eid, repth, REP_THROTTLE_ONLY)) == 0 && repth->type == typemore) { RPRINT(dbenv, rep, (dbenv, &mb, "bulk_msg: Record %d (0x%x) hit throttle limit.", recsize, recsize)); MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); return (ret); } /* * Now we own the buffer, and we know our record fits into it. * The buffer is structured with the len, LSN and then the record. * Copy the record into the buffer. Then if we need to, * send the buffer. */ /* * First thing is the length of the dbt record. */ p = bulk->addr + *(bulk->offp); memcpy(p, &dbt->size, sizeof(dbt->size)); p += sizeof(dbt->size); /* * The next thing is the LSN. We need LSNs for both pages and * log records. For log records, this is obviously, the LSN of * this record. For pages, the LSN is used by the internal init code. */ memcpy(p, lsn, sizeof(DB_LSN)); RPRINT(dbenv, rep, (dbenv, &mb, "bulk_msg: Copying LSN [%lu][%lu] of %lu bytes to %#lx", (u_long)lsn->file, (u_long)lsn->offset, (u_long)dbt->size, P_TO_ULONG(p))); p += sizeof(DB_LSN); /* * If we're the first record, we need to save the first * LSN in the bulk structure. */ if (*(bulk->offp) == 0) bulk->lsn = *lsn; /* * Now copy the record and finally adjust the offset. */ memcpy(p, dbt->data, dbt->size); p += dbt->size; *(bulk->offp) = (uintptr_t)p - (uintptr_t)bulk->addr; rep->stat.st_bulk_records++; /* * Send the buffer if it is a perm record or a force. */ if (LF_ISSET(DB_LOG_PERM) || FLD_ISSET(*(bulk->flagsp), BULK_FORCE)) { RPRINT(dbenv, rep, (dbenv, &mb, "bulk_msg: Send buffer after copy due to %s", LF_ISSET(DB_LOG_PERM) ? "PERM" : "FORCE")); ret = __rep_send_bulk(dbenv, bulk, flags); } MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); return (ret); } /* * __rep_send_bulk -- * This function transmits the bulk buffer given. It assumes the * caller holds the REP->mtx_clientdb. We may release it and reacquire * it during this call. We will return with it held. * * PUBLIC: int __rep_send_bulk __P((DB_ENV *, REP_BULK *, u_int32_t)); */ int __rep_send_bulk(dbenv, bulkp, flags) DB_ENV *dbenv; REP_BULK *bulkp; u_int32_t flags; { DB_REP *db_rep; REP *rep; DBT dbt; int ret; #ifdef DIAGNOSTIC DB_MSGBUF mb; #endif /* * If the offset is 0, we're done. There is nothing to send. */ if (*(bulkp->offp) == 0) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; memset(&dbt, 0, sizeof(dbt)); /* * Set that this buffer is being actively transmitted. */ FLD_SET(*(bulkp->flagsp), BULK_XMIT); dbt.data = bulkp->addr; dbt.size = (u_int32_t)*(bulkp->offp); MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); RPRINT(dbenv, rep, (dbenv, &mb, "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size)); /* * Unlocked the mutex and now send the message. */ rep->stat.st_bulk_transfers++; ret = __rep_send_message(dbenv, bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, flags, 0); MUTEX_LOCK(dbenv, rep->mtx_clientdb); /* * If we're successful, reset the offset pointer to 0. * Clear the transmit flag regardless. */ if (ret == 0) *(bulkp->offp) = 0; FLD_CLR(*(bulkp->flagsp), BULK_XMIT); return (ret); } /* * __rep_bulk_alloc -- * This function allocates and initializes an internal bulk buffer. * This is used by the master when fulfilling a request for a chunk of * log records or a bunch of pages. * * PUBLIC: int __rep_bulk_alloc __P((DB_ENV *, REP_BULK *, int, uintptr_t *, * PUBLIC: u_int32_t *, u_int32_t)); */ int __rep_bulk_alloc(dbenv, bulkp, eid, offp, flagsp, type) DB_ENV *dbenv; REP_BULK *bulkp; int eid; uintptr_t *offp; u_int32_t *flagsp, type; { int ret; memset(bulkp, 0, sizeof(REP_BULK)); *offp = *flagsp = 0; bulkp->len = MEGABYTE; if ((ret = __os_malloc(dbenv, bulkp->len, &bulkp->addr)) != 0) return (ret); bulkp->offp = offp; bulkp->type = type; bulkp->eid = eid; bulkp->flagsp = flagsp; return (ret); } /* * __rep_bulk_free -- * This function sends the remainder of the bulk buffer and frees it. * * PUBLIC: int __rep_bulk_free __P((DB_ENV *, REP_BULK *, u_int32_t)); */ int __rep_bulk_free(dbenv, bulkp, flags) DB_ENV *dbenv; REP_BULK *bulkp; u_int32_t flags; { DB_REP *db_rep; int ret; db_rep = dbenv->rep_handle; MUTEX_LOCK(dbenv, db_rep->region->mtx_clientdb); ret = __rep_send_bulk(dbenv, bulkp, flags); MUTEX_UNLOCK(dbenv, db_rep->region->mtx_clientdb); __os_free(dbenv, bulkp->addr); return (ret); } /* * __rep_send_message -- * This is a wrapper for sending a message. It takes care of constructing * the REP_CONTROL structure and calling the user's specified send function. * * PUBLIC: int __rep_send_message __P((DB_ENV *, int, * PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t, u_int32_t)); */ int __rep_send_message(dbenv, eid, rtype, lsnp, dbt, logflags, repflags) DB_ENV *dbenv; int eid; u_int32_t rtype; DB_LSN *lsnp; const DBT *dbt; u_int32_t logflags, repflags; { DB_REP *db_rep; REP *rep; DBT cdbt, scrap_dbt; REP_CONTROL cntrl; int ret; u_int32_t myflags, rectype; #ifdef DIAGNOSTIC DB_MSGBUF mb; #endif db_rep = dbenv->rep_handle; rep = db_rep->region; /* Set up control structure. */ memset(&cntrl, 0, sizeof(cntrl)); if (lsnp == NULL) ZERO_LSN(cntrl.lsn); else cntrl.lsn = *lsnp; cntrl.rectype = rtype; cntrl.flags = logflags; cntrl.rep_version = DB_REPVERSION; cntrl.log_version = DB_LOGVERSION; cntrl.gen = rep->gen; memset(&cdbt, 0, sizeof(cdbt)); cdbt.data = &cntrl; cdbt.size = sizeof(cntrl); /* Don't assume the send function will be tolerant of NULL records. */ if (dbt == NULL) { memset(&scrap_dbt, 0, sizeof(DBT)); dbt = &scrap_dbt; } REP_PRINT_MESSAGE(dbenv, eid, &cntrl, "rep_send_message"); #ifdef REP_DIAGNOSTIC if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION) && rtype == REP_LOG) __rep_print_logmsg(dbenv, dbt, lsnp); #endif /* * There are several types of records: commit and checkpoint records * that affect database durability, regular log records that might * be buffered on the master before being transmitted, and control * messages which don't require the guarantees of permanency, but * should not be buffered. * * There are request records that can be sent anywhere, and there * are rerequest records that the app might want to send to the master. */ myflags = repflags; if (FLD_ISSET(logflags, DB_LOG_PERM)) myflags |= DB_REP_PERMANENT; else if (rtype != REP_LOG || FLD_ISSET(logflags, DB_LOG_RESEND)) myflags |= DB_REP_NOBUFFER; if (rtype == REP_LOG && !FLD_ISSET(logflags, DB_LOG_PERM)) { /* * Check if this is a log record we just read that * may need a DB_LOG_PERM. This is of type REP_LOG, * so we know that dbt is a log record. */ memcpy(&rectype, dbt->data, sizeof(rectype)); if (rectype == DB___txn_regop || rectype == DB___txn_ckp) F_SET(&cntrl, DB_LOG_PERM); } /* * We set the LSN above to something valid. Give the master the * actual LSN so that they can coordinate with permanent records from * the client if they want to. */ ret = dbenv->rep_send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags); /* * We don't hold the rep lock, so this could miscount if we race. * I don't think it's worth grabbing the mutex for that bit of * extra accuracy. */ if (ret == 0) rep->stat.st_msgs_sent++; else { rep->stat.st_msgs_send_failures++; RPRINT(dbenv, rep, (dbenv, &mb, "rep_send_function returned: %d", ret)); } return (ret); } #ifdef REP_DIAGNOSTIC /* * __rep_print_logmsg -- * This is a debugging routine for printing out log records that * we are about to transmit to a client. */ static void __rep_print_logmsg(dbenv, logdbt, lsnp) DB_ENV *dbenv; const DBT *logdbt; DB_LSN *lsnp; { /* Static structures to hold the printing functions. */ static int (**ptab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *)) = NULL; size_t ptabsize = 0; if (ptabsize == 0) { /* Initialize the table. */ (void)__bam_init_print(dbenv, &ptab, &ptabsize); (void)__crdel_init_print(dbenv, &ptab, &ptabsize); (void)__db_init_print(dbenv, &ptab, &ptabsize); (void)__dbreg_init_print(dbenv, &ptab, &ptabsize); (void)__fop_init_print(dbenv, &ptab, &ptabsize); (void)__ham_init_print(dbenv, &ptab, &ptabsize); (void)__qam_init_print(dbenv, &ptab, &ptabsize); (void)__txn_init_print(dbenv, &ptab, &ptabsize); } (void)__db_dispatch(dbenv, ptab, ptabsize, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL); } #endif /* * __rep_new_master -- * Called after a master election to sync back up with a new master. * It's possible that we already know of this new master in which case * we don't need to do anything. * * This is written assuming that this message came from the master; we * need to enforce that in __rep_process_record, but right now, we have * no way to identify the master. * * PUBLIC: int __rep_new_master __P((DB_ENV *, REP_CONTROL *, int)); */ int __rep_new_master(dbenv, cntrl, eid) DB_ENV *dbenv; REP_CONTROL *cntrl; int eid; { DB_LOG *dblp; DB_LOGC *logc; DB_LSN first_lsn, lsn; DB_REP *db_rep; DBT dbt; LOG *lp; REGENV *renv; REGINFO *infop; REP *rep; int change, do_req, ret, t_ret; #ifdef DIAGNOSTIC DB_MSGBUF mb; #endif db_rep = dbenv->rep_handle; rep = db_rep->region; ret = 0; logc = NULL; REP_SYSTEM_LOCK(dbenv); __rep_elect_done(dbenv, rep); change = rep->gen != cntrl->gen || rep->master_id != eid; if (change) { RPRINT(dbenv, rep, (dbenv, &mb, "Updating gen from %lu to %lu from master %d", (u_long)rep->gen, (u_long)cntrl->gen, eid)); rep->gen = cntrl->gen; if (rep->egen <= rep->gen) rep->egen = rep->gen + 1; RPRINT(dbenv, rep, (dbenv, &mb, "Egen is %lu", (u_long)rep->egen)); rep->master_id = eid; rep->stat.st_master_changes++; rep->stat.st_startup_complete = 0; /* * If we're delaying client sync-up, we know we have a * new/changed master now, set flag indicating we are * actively delaying. */ if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT)) F_SET(rep, REP_F_DELAY); /* * If we are already locking out others, we're either * in the middle of sync-up recovery or internal init * when this newmaster comes in (we also lockout in * rep_start, but we cannot be racing that because we * don't allow rep_proc_msg when rep_start is going on). * * If we were in the middle of an internal initialization * and we've discovered a new master instead, clean up * our old internal init information. We need to clean * up any flags and unlock our lockout. */ if (rep->in_recovery || F_ISSET(rep, REP_F_READY)) { (void)__rep_init_cleanup(dbenv, rep, DB_FORCE); F_CLR(rep, REP_F_RECOVER_MASK); rep->in_recovery = 0; F_CLR(rep, REP_F_READY); } F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY); } REP_SYSTEM_UNLOCK(dbenv); dblp = dbenv->lg_handle; lp = dblp->reginfo.primary; LOG_SYSTEM_LOCK(dbenv); lsn = lp->lsn; LOG_SYSTEM_UNLOCK(dbenv); if (!change) { /* * If there wasn't a change, we might still have some * catching up or verification to do. */ ret = 0; MUTEX_LOCK(dbenv, rep->mtx_clientdb); do_req = __rep_check_doreq(dbenv, rep); if (F_ISSET(rep, REP_F_RECOVER_VERIFY)) { lsn = lp->verify_lsn; MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); if (!F_ISSET(rep, REP_F_DELAY) && !IS_ZERO_LSN(lsn) && do_req) (void)__rep_send_message(dbenv, eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE); } else { MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); if (log_compare(&lsn, &cntrl->lsn) < 0 && do_req) (void)__rep_send_message(dbenv, eid, REP_ALL_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE); REP_SYSTEM_LOCK(dbenv); F_CLR(rep, REP_F_NOARCHIVE); REP_SYSTEM_UNLOCK(dbenv); } return (ret); } /* * If the master changed, we need to start the process of * figuring out what our last valid log record is. However, * if both the master and we agree that the max LSN is 0,0, * then there is no recovery to be done. If we are at 0 and * the master is not, then we just need to request all the log * records from the master. */ if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) { /* * If we have no log, then we have no files to open * in recovery, but we've opened what we can, which * is none. Mark DBREP_OPENFILES here. */ empty: MUTEX_LOCK(dbenv, rep->mtx_clientdb); F_SET(db_rep, DBREP_OPENFILES); ZERO_LSN(lp->verify_lsn); REP_SYSTEM_LOCK(dbenv); F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK); REP_SYSTEM_UNLOCK(dbenv); if (!IS_INIT_LSN(cntrl->lsn)) { /* * We're making an ALL_REQ. But now that we've * cleared the flags, we're likely receiving new * log records from the master, resulting in a gap * immediately. So to avoid multiple data streams, * set the wait_recs value high now to give the master * a chance to start sending us these records before * the gap code re-requests the same gap. Wait_recs * will get reset once we start receiving these * records. */ lp->wait_recs = rep->max_gap; MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); /* * Don't send the ALL_REQ if we're delayed. But we * check here, after lp->wait_recs is set up so that * when the app calls rep_sync, everything is ready * to go. */ if (!F_ISSET(rep, REP_F_DELAY)) (void)__rep_send_message(dbenv, eid, REP_ALL_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE); } else MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); return (DB_REP_NEWMASTER); } memset(&dbt, 0, sizeof(dbt)); /* * If this client is farther ahead on the log file than the master, see * if there is any overlap in the logs. If not, the client is too * far ahead of the master and we cannot determine they're part of * the same replication group. */ if (cntrl->lsn.file < lsn.file) { if ((ret = __log_cursor(dbenv, &logc)) != 0) goto err; if ((ret = __log_c_get(logc, &first_lsn, &dbt, DB_FIRST)) != 0) goto err; if (cntrl->lsn.file < first_lsn.file) { __db_err(dbenv, "Client too far ahead of master; unable to join replication group"); ret = DB_REP_JOIN_FAILURE; goto err; } ret = __log_c_close(logc); logc = NULL; if (ret != 0) goto err; } if ((ret = __log_cursor(dbenv, &logc)) != 0) goto err; ret = __rep_log_backup(logc, &lsn); err: if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0) ret = t_ret; if (ret == DB_NOTFOUND) { /* * If we don't have an identification record, we still might * have some log records but we're discarding them to sync * up with the master from the start. Therefore, * truncate our log and go to the no log case. */ INIT_LSN(lsn); RPRINT(dbenv, rep, (dbenv, &mb, "No commit or ckp found. Truncate log.")); (void)__log_vtruncate(dbenv, &lsn, &lsn, NULL); infop = dbenv->reginfo; renv = infop->primary; REP_SYSTEM_LOCK(dbenv); (void)time(&renv->rep_timestamp); REP_SYSTEM_UNLOCK(dbenv); goto empty; } /* * If we failed here, we need to clear the flags we may * have set above because we're not going to be setting * the verify_lsn. */ if (ret != 0) { REP_SYSTEM_LOCK(dbenv); F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY); REP_SYSTEM_UNLOCK(dbenv); return (ret); } /* * Finally, we have a record to ask for. */ MUTEX_LOCK(dbenv, rep->mtx_clientdb); lp->verify_lsn = lsn; lp->rcvd_recs = 0; lp->wait_recs = rep->request_gap; MUTEX_UNLOCK(dbenv, rep->mtx_clientdb); if (!F_ISSET(rep, REP_F_DELAY)) (void)__rep_send_message(dbenv, eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE); return (DB_REP_NEWMASTER); } /* * __rep_is_client * Used by other subsystems to figure out if this is a replication * client site. * * PUBLIC: int __rep_is_client __P((DB_ENV *)); */ int __rep_is_client(dbenv) DB_ENV *dbenv; { DB_REP *db_rep; REP *rep; if (!REP_ON(dbenv)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; /* * Don't just return F_ISSET since that converts unsigned * into signed. */ return (F_ISSET(rep, REP_F_CLIENT) ? 1 : 0); } /* * __rep_noarchive * Used by log_archive to determine if it is okay to remove * log files. * * PUBLIC: int __rep_noarchive __P((DB_ENV *)); */ int __rep_noarchive(dbenv) DB_ENV *dbenv; { DB_REP *db_rep; REGENV *renv; REGINFO *infop; REP *rep; time_t timestamp; infop = dbenv->reginfo; renv = infop->primary; /* * This is tested before REP_ON below because we always need * to obey if any replication process has disabled archiving. * Everything is in the environment region that we need here. */ if (F_ISSET(renv, DB_REGENV_REPLOCKED)) { (void)time(×tamp); TIMESTAMP_CHECK(dbenv, timestamp, renv); /* * Check if we're still locked out after checking * the timestamp. */ if (F_ISSET(renv, DB_REGENV_REPLOCKED)) return (EINVAL); } if (!REP_ON(dbenv)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; if (F_ISSET(rep, REP_F_NOARCHIVE)) return (1); return (0); } /* * __rep_send_vote * Send this site's vote for the election. * * PUBLIC: void __rep_send_vote __P((DB_ENV *, DB_LSN *, int, int, int, * PUBLIC: u_int32_t, u_int32_t, int, u_int32_t)); */ void __rep_send_vote(dbenv, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype) DB_ENV *dbenv; DB_LSN *lsnp; int eid, nsites, nvotes, pri; u_int32_t egen, tie, vtype; { DBT vote_dbt; REP_VOTE_INFO vi; memset(&vi, 0, sizeof(vi)); vi.egen = egen; vi.priority = pri; vi.nsites = nsites; vi.nvotes = nvotes; vi.tiebreaker = tie; memset(&vote_dbt, 0, sizeof(vote_dbt)); vote_dbt.data = &vi; vote_dbt.size = sizeof(vi); (void)__rep_send_message(dbenv, eid, vtype, lsnp, &vote_dbt, 0, 0); } /* * __rep_elect_done * Clear all election information for this site. Assumes the * caller hold the region mutex. * * PUBLIC: void __rep_elect_done __P((DB_ENV *, REP *)); */ void __rep_elect_done(dbenv, rep) DB_ENV *dbenv; REP *rep; { int inelect; u_int32_t endsec, endusec; #ifdef DIAGNOSTIC DB_MSGBUF mb; #else COMPQUIET(dbenv, NULL); #endif inelect = IN_ELECTION_TALLY(rep); F_CLR(rep, REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY); rep->sites = 0; rep->votes = 0; if (inelect) { if (rep->esec != 0) { __os_clock(dbenv, &endsec, &endusec); __db_difftime(rep->esec, endsec, rep->eusec, endusec, &rep->stat.st_election_sec, &rep->stat.st_election_usec); RPRINT(dbenv, rep, (dbenv, &mb, "Election finished in %u.%06u sec", rep->stat.st_election_sec, rep->stat.st_election_usec)); rep->esec = 0; rep->eusec = 0; } rep->egen++; } RPRINT(dbenv, rep, (dbenv, &mb, "Election done; egen %lu", (u_long)rep->egen)); } /* * __rep_grow_sites -- * Called to allocate more space in the election tally information. * Called with the rep mutex held. We need to call the region mutex, so * we need to make sure that we *never* acquire those mutexes in the * opposite order. * * PUBLIC: int __rep_grow_sites __P((DB_ENV *dbenv, int nsites)); */ int __rep_grow_sites(dbenv, nsites) DB_ENV *dbenv; int nsites; { REGENV *renv; REGINFO *infop; REP *rep; int nalloc, ret, *tally; rep = ((DB_REP *)dbenv->rep_handle)->region; /* * Allocate either twice the current allocation or nsites, * whichever is more. */ nalloc = 2 * rep->asites; if (nalloc < nsites) nalloc = nsites; infop = dbenv->reginfo; renv = infop->primary; MUTEX_LOCK(dbenv, renv->mtx_regenv); /* * We allocate 2 tally regions, one for tallying VOTE1's and * one for VOTE2's. Always grow them in tandem, because if we * get more VOTE1's we'll always expect more VOTE2's then too. */ if ((ret = __db_shalloc(infop, (size_t)nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY), &tally)) == 0) { if (rep->tally_off != INVALID_ROFF) __db_shalloc_free( infop, R_ADDR(infop, rep->tally_off)); rep->tally_off = R_OFFSET(infop, tally); if ((ret = __db_shalloc(infop, (size_t)nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY), &tally)) == 0) { /* Success */ if (rep->v2tally_off != INVALID_ROFF) __db_shalloc_free(infop, R_ADDR(infop, rep->v2tally_off)); rep->v2tally_off = R_OFFSET(infop, tally); rep->asites = nalloc; rep->nsites = nsites; } else { /* * We were unable to allocate both. So, we must * free the first one and reinitialize. If * v2tally_off is valid, it is from an old * allocation and we are clearing it all out due * to the error. */ if (rep->v2tally_off != INVALID_ROFF) __db_shalloc_free(infop, R_ADDR(infop, rep->v2tally_off)); __db_shalloc_free(infop, R_ADDR(infop, rep->tally_off)); rep->v2tally_off = rep->tally_off = INVALID_ROFF; rep->asites = 0; rep->nsites = 0; } } MUTEX_UNLOCK(dbenv, renv->mtx_regenv); return (ret); } /* * __env_rep_enter -- * * Check if we are in the middle of replication initialization and/or * recovery, and if so, disallow operations. If operations are allowed, * increment handle-counts, so that we do not start recovery while we * are operating in the library. * * PUBLIC: int __env_rep_enter __P((DB_ENV *, int)); */ int __env_rep_enter(dbenv, checklock) DB_ENV *dbenv; int checklock; { DB_REP *db_rep; REGENV *renv; REGINFO *infop; REP *rep; int cnt; time_t timestamp; /* Check if locks have been globally turned off. */ if (F_ISSET(dbenv, DB_ENV_NOLOCKING)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; infop = dbenv->reginfo; renv = infop->primary; if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) { (void)time(×tamp); TIMESTAMP_CHECK(dbenv, timestamp, renv); /* * Check if we're still locked out after checking * the timestamp. */ if (F_ISSET(renv, DB_REGENV_REPLOCKED)) return (EINVAL); } REP_SYSTEM_LOCK(dbenv); for (cnt = 0; rep->in_recovery;) { REP_SYSTEM_UNLOCK(dbenv); if (FLD_ISSET(rep->config, REP_C_NOWAIT)) { __db_err(dbenv, "Operation locked out. Waiting for replication recovery to complete"); return (DB_REP_LOCKOUT); } __os_sleep(dbenv, 1, 0); REP_SYSTEM_LOCK(dbenv); if (++cnt % 60 == 0) __db_err(dbenv, "DB_ENV handle waiting %d minutes for replication recovery to complete", cnt / 60); } rep->handle_cnt++; REP_SYSTEM_UNLOCK(dbenv); return (0); } /* * __env_db_rep_exit -- * * Decrement handle count upon routine exit. * * PUBLIC: int __env_db_rep_exit __P((DB_ENV *)); */ int __env_db_rep_exit(dbenv) DB_ENV *dbenv; { DB_REP *db_rep; REP *rep; /* Check if locks have been globally turned off. */ if (F_ISSET(dbenv, DB_ENV_NOLOCKING)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; REP_SYSTEM_LOCK(dbenv); rep->handle_cnt--; REP_SYSTEM_UNLOCK(dbenv); return (0); } /* * __db_rep_enter -- * Called in replicated environments to keep track of in-use handles * and prevent any concurrent operation during recovery. If checkgen is * non-zero, then we verify that the dbp has the same handle as the env. * * If return_now is non-zero, we'll return DB_DEADLOCK immediately, else we'll * sleep before returning DB_DEADLOCK. Without the sleep, it is likely * the application will immediately try again and could reach a retry * limit before replication has a chance to finish. The sleep increases * the probability that an application retry will succeed. * * PUBLIC: int __db_rep_enter __P((DB *, int, int, int)); */ int __db_rep_enter(dbp, checkgen, checklock, return_now) DB *dbp; int checkgen, checklock, return_now; { DB_ENV *dbenv; DB_REP *db_rep; REGENV *renv; REGINFO *infop; REP *rep; time_t timestamp; dbenv = dbp->dbenv; /* Check if locks have been globally turned off. */ if (F_ISSET(dbenv, DB_ENV_NOLOCKING)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; infop = dbenv->reginfo; renv = infop->primary; if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) { (void)time(×tamp); TIMESTAMP_CHECK(dbenv, timestamp, renv); /* * Check if we're still locked out after checking * the timestamp. */ if (F_ISSET(renv, DB_REGENV_REPLOCKED)) return (EINVAL); } REP_SYSTEM_LOCK(dbenv); if (F_ISSET(rep, REP_F_READY)) { REP_SYSTEM_UNLOCK(dbenv); if (!return_now) __os_sleep(dbenv, 5, 0); return (DB_LOCK_DEADLOCK); } if (checkgen && dbp->timestamp != renv->rep_timestamp) { REP_SYSTEM_UNLOCK(dbenv); __db_err(dbenv, "%s %s", "replication recovery unrolled committed transactions;", "open DB and DBcursor handles must be closed"); return (DB_REP_HANDLE_DEAD); } rep->handle_cnt++; REP_SYSTEM_UNLOCK(dbenv); return (0); } /* * __op_rep_enter -- * * Check if we are in the middle of replication initialization and/or * recovery, and if so, disallow new multi-step operations, such as * transaction and memp gets. If operations are allowed, * increment the op_cnt, so that we do not start recovery while we have * active operations. * * PUBLIC: int __op_rep_enter __P((DB_ENV *)); */ int __op_rep_enter(dbenv) DB_ENV *dbenv; { DB_REP *db_rep; REP *rep; int cnt; /* Check if locks have been globally turned off. */ if (F_ISSET(dbenv, DB_ENV_NOLOCKING)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; REP_SYSTEM_LOCK(dbenv); for (cnt = 0; F_ISSET(rep, REP_F_READY);) { REP_SYSTEM_UNLOCK(dbenv); if (FLD_ISSET(rep->config, REP_C_NOWAIT)) { __db_err(dbenv, "Operation locked out. Waiting for replication recovery to complete"); return (DB_REP_LOCKOUT); } __os_sleep(dbenv, 5, 0); cnt += 5; REP_SYSTEM_LOCK(dbenv); if (cnt % 60 == 0) __db_err(dbenv, "__op_rep_enter waiting %d minutes for op count to drain", cnt / 60); } rep->op_cnt++; REP_SYSTEM_UNLOCK(dbenv); return (0); } /* * __op_rep_exit -- * * Decrement op count upon transaction commit/abort/discard or * memp_fput. * * PUBLIC: int __op_rep_exit __P((DB_ENV *)); */ int __op_rep_exit(dbenv) DB_ENV *dbenv; { DB_REP *db_rep; REP *rep; /* Check if locks have been globally turned off. */ if (F_ISSET(dbenv, DB_ENV_NOLOCKING)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; REP_SYSTEM_LOCK(dbenv); DB_ASSERT(rep->op_cnt > 0); rep->op_cnt--; REP_SYSTEM_UNLOCK(dbenv); return (0); } /* * __rep_get_gen -- * * Get the generation number from a replicated environment. * * PUBLIC: int __rep_get_gen __P((DB_ENV *, u_int32_t *)); */ int __rep_get_gen(dbenv, genp) DB_ENV *dbenv; u_int32_t *genp; { DB_REP *db_rep; REP *rep; db_rep = dbenv->rep_handle; rep = db_rep->region; REP_SYSTEM_LOCK(dbenv); if (rep->recover_gen > rep->gen) *genp = rep->recover_gen; else *genp = rep->gen; REP_SYSTEM_UNLOCK(dbenv); return (0); } /* * __rep_lockout -- * Coordinate with other threads in the library and active txns so * that we can run single-threaded, for recovery or internal backup. * Assumes the caller holds the region mutex. * * PUBLIC: int __rep_lockout __P((DB_ENV *, REP *, u_int32_t)); */ int __rep_lockout(dbenv, rep, msg_th) DB_ENV *dbenv; REP *rep; u_int32_t msg_th; { int wait_cnt; /* Phase 1: set REP_F_READY and wait for op_cnt to go to 0. */ F_SET(rep, REP_F_READY); for (wait_cnt = 0; rep->op_cnt != 0;) { REP_SYSTEM_UNLOCK(dbenv); __os_sleep(dbenv, 1, 0); #if defined(DIAGNOSTIC) || defined(CONFIG_TEST) if (++wait_cnt % 60 == 0) __db_err(dbenv, "Waiting for txn_cnt to run replication recovery/backup for %d minutes", wait_cnt / 60); #endif REP_SYSTEM_LOCK(dbenv); } /* * Phase 2: set in_recovery and wait for handle count to go * to 0 and for the number of threads in __rep_process_message * to go to 1 (us). */ rep->in_recovery = 1; for (wait_cnt = 0; rep->handle_cnt != 0 || rep->msg_th > msg_th;) { REP_SYSTEM_UNLOCK(dbenv); __os_sleep(dbenv, 1, 0); #ifdef DIAGNOSTIC if (++wait_cnt % 60 == 0) __db_err(dbenv, "Waiting for handle count to run replication recovery/backup for %d minutes", wait_cnt / 60); #endif REP_SYSTEM_LOCK(dbenv); } return (0); } /* * __rep_send_throttle - * Send a record, throttling if necessary. Callers of this function * will throttle - breaking out of their loop, if the repth->type field * changes from the normal message type to the *_MORE message type. * This function will send the normal type unless throttling gets invoked. * Then it sets the type field and sends the _MORE message. * * PUBLIC: int __rep_send_throttle __P((DB_ENV *, int, REP_THROTTLE *, * PUBLIC: u_int32_t)); */ int __rep_send_throttle(dbenv, eid, repth, flags) DB_ENV *dbenv; int eid; REP_THROTTLE *repth; u_int32_t flags; { DB_REP *db_rep; REP *rep; u_int32_t size, typemore; int check_limit; check_limit = repth->gbytes != 0 || repth->bytes != 0; /* * If we only want to do throttle processing and we don't have it * turned on, return immediately. */ if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY)) return (0); db_rep = dbenv->rep_handle; rep = db_rep->region; typemore = 0; if (repth->type == REP_LOG) typemore = REP_LOG_MORE; if (repth->type == REP_PAGE) typemore = REP_PAGE_MORE; DB_ASSERT(typemore != 0); /* * data_dbt.size is only the size of the log * record; it doesn't count the size of the * control structure. Factor that in as well * so we're not off by a lot if our log records * are small. */ size = repth->data_dbt->size + sizeof(REP_CONTROL); if (check_limit) { if (repth->lsn.offset == 28) { repth->type = typemore; goto send; } while (repth->bytes <= size) { if (repth->gbytes > 0) { repth->bytes += GIGABYTE; --(repth->gbytes); continue; } /* * We don't hold the rep mutex, * and may miscount. */ rep->stat.st_nthrottles++; repth->type = typemore; goto send; } repth->bytes -= size; } /* * Always send if it is typemore, otherwise send only if * REP_THROTTLE_ONLY is not set. */ send: if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) && (__rep_send_message(dbenv, eid, repth->type, &repth->lsn, repth->data_dbt, DB_LOG_RESEND, 0) != 0)) return (1); return (0); } #ifdef DIAGNOSTIC /* * PUBLIC: void __rep_print_message __P((DB_ENV *, int, REP_CONTROL *, char *)); */ void __rep_print_message(dbenv, eid, rp, str) DB_ENV *dbenv; int eid; REP_CONTROL *rp; char *str; { DB_MSGBUF mb; char *type; switch (rp->rectype) { case REP_ALIVE: type = "alive"; break; case REP_ALIVE_REQ: type = "alive_req"; break; case REP_ALL_REQ: type = "all_req"; break; case REP_BULK_LOG: type = "bulk_log"; break; case REP_BULK_PAGE: type = "bulk_page"; break; case REP_DUPMASTER: type = "dupmaster"; break; case REP_FILE: type = "file"; break; case REP_FILE_FAIL: type = "file_fail"; break; case REP_FILE_REQ: type = "file_req"; break; case REP_LOG: type = "log"; break; case REP_LOG_MORE: type = "log_more"; break; case REP_LOG_REQ: type = "log_req"; break; case REP_MASTER_REQ: type = "master_req"; break; case REP_NEWCLIENT: type = "newclient"; break; case REP_NEWFILE: type = "newfile"; break; case REP_NEWMASTER: type = "newmaster"; break; case REP_NEWSITE: type = "newsite"; break; case REP_PAGE: type = "page"; break; case REP_PAGE_FAIL: type = "page_fail"; break; case REP_PAGE_MORE: type = "page_more"; break; case REP_PAGE_REQ: type = "page_req"; break; case REP_REREQUEST: type = "rerequest"; break; case REP_UPDATE: type = "update"; break; case REP_UPDATE_REQ: type = "update_req"; break; case REP_VERIFY: type = "verify"; break; case REP_VERIFY_FAIL: type = "verify_fail"; break; case REP_VERIFY_REQ: type = "verify_req"; break; case REP_VOTE1: type = "vote1"; break; case REP_VOTE2: type = "vote2"; break; default: type = "NOTYPE"; break; } RPRINT(dbenv, ((REP *)((DB_REP *)(dbenv)->rep_handle)->region), (dbenv, &mb, "%s %s: gen = %lu eid %d, type %s, LSN [%lu][%lu]", dbenv->db_home, str, (u_long)rp->gen, eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); } #endif