/*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
 *
 * librsync -- library for network deltas
 * $Id: delta.c,v 1.36 2004/09/10 02:48:57 mbp Exp $
 *
 * Copyright (C) 2000, 2001 by Martin Pool <mbp@samba.org>
 * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au> 
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation; either version 2.1 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

                                /*
                                 | Let's climb to the TOP of that
                                 | MOUNTAIN and think about STRIP
                                 | MINING!!
                                 */


/*
 * delta.c -- Generate in streaming mode an rsync delta given a set of
 * signatures, and a new file.
 *
 * The size of blocks for signature generation is determined by the
 * block size in the incoming signature.
 *
 * To calculate a signature, we need to be able to see at least one
 * block of the new file at a time.  Once we have that, we calculate
 * its weak signature, and see if there is any block in the signature
 * hash table that has the same weak sum.  If there is one, then we
 * also compute the strong sum of the new block, and cross check that.
 * If they're the same, then we can assume we have a match.
 *
 * The final block of the file has to be handled a little differently,
 * because it may be a short match.  Short blocks in the signature
 * don't include their length -- we just allow for the final short
 * block of the file to match any block in the signature, and if they
 * have the same checksum we assume they must have the same length.
 * Therefore, when we emit a COPY command, we have to send it with a
 * length that is the same as the block matched, and not the block
 * length from the signature.
 */

/*
 * Profiling results as of v1.26, 2001-03-18:
 *
 * If everything matches, then we spend almost all our time in
 * rs_mdfour64 and rs_weak_sum, which is unavoidable and therefore a
 * good profile.
 *
 * If nothing matches, it is not so good.
 */


#include <config.h>

#include <assert.h>
#include <stdlib.h>
#include <stdio.h>

#include "librsync.h"
#include "emit.h"
#include "stream.h"
#include "util.h"
#include "sumset.h"
#include "job.h"
#include "trace.h"
#include "search.h"
#include "types.h"
#include "rollsum.h"


/**
 * 2002-06-26: Donovan Baarda
 * 
 * The following is based entirely on pysync. It is much cleaner than the
 * previous incarnation of this code. It is slightly complicated because in
 * this case the output can block, so the main delta loop needs to stop
 * when this happens.
 * 
 * In pysync a 'last' attribute is used to hold the last miss or match for
 * extending if possible. In this code, basis_len and scoop_pos are used 
 * instead of 'last'. When basis_len > 0, last is a match. When basis_len =
 * 0 and scoop_pos is > 0, last is a miss. When both are 0, last is None
 * (ie, nothing).
 * 
 * Pysync is also slightly different in that a 'flush' method is available
 * to force output of accumulated data. This 'flush' is use to finalise
 * delta calculation. In librsync input is terminated with an eof flag on
 * the input stream. I have structured this code similar to pysync with a
 * seperate flush function that is used when eof is reached. This allows
 * for a flush style API if one is ever needed. Note that flush in pysync
 * can be used for more than just terminating delta calculation, so a flush
 * based API can in some ways be more flexible...
 * 
 * The input data is first scanned, then processed. Scanning identifies
 * input data as misses or matches, and emits the instruction stream.
 * Processing the data consumes it off the input scoop and outputs the
 * processed miss data into the tube.
 * 
 * The scoop contains all data yet to be processed. The scoop_pos is an
 * index into the scoop that indicates the point scanned to. As data is
 * scanned, scoop_pos is incremented. As data is processed, it is removed
 * from the scoop and scoop_pos adjusted. Everything gets complicated
 * because the tube can block. When the tube is blocked, no data can be
 * processed.
 * 
 */

/* used by rdiff, but now redundant */
int rs_roll_paranoia = 0;

static rs_result rs_delta_s_scan(rs_job_t *job);
static rs_result rs_delta_s_flush(rs_job_t *job);
static rs_result rs_delta_s_end(rs_job_t *job);
void rs_getinput(rs_job_t *job);
inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len);
inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len);
inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
inline rs_result rs_appendflush(rs_job_t *job);
inline rs_result rs_processmatch(rs_job_t *job);
inline rs_result rs_processmiss(rs_job_t *job);

/**
 * \brief Get a block of data if possible, and see if it matches.
 * 
 * On each call, we try to process all of the input data available on the
 * scoop and input buffer. */
static rs_result rs_delta_s_scan(rs_job_t *job)
{
    rs_long_t      match_pos;
    size_t         match_len;
    rs_result      result;
    Rollsum        test;

    rs_job_check(job);
    /* read the input into the scoop */
    rs_getinput(job);
    /* output any pending output from the tube */
    result=rs_tube_catchup(job);
    /* while output is not blocked and there is a block of data */
    while ((result==RS_DONE) && 
           ((job->scoop_pos + job->block_len) < job->scoop_avail)) {
        /* check if this block matches */
        if (rs_findmatch(job,&match_pos,&match_len)) {
            /* append the match and reset the weak_sum */
            result=rs_appendmatch(job,match_pos,match_len);
            RollsumInit(&job->weak_sum);
        } else {
            /* rotate the weak_sum and append the miss byte */
            RollsumRotate(&job->weak_sum,job->scoop_next[job->scoop_pos],
                          job->scoop_next[job->scoop_pos+job->block_len]);
            result=rs_appendmiss(job,1);
            if (rs_roll_paranoia) {
                RollsumInit(&test);
                RollsumUpdate(&test,job->scoop_next+job->scoop_pos,
                              job->block_len);
                if (RollsumDigest(&test) != RollsumDigest(&job->weak_sum)) {
                    rs_fatal("mismatch between rolled sum %#x and check %#x",
                             (int)RollsumDigest(&job->weak_sum),
                             (int)RollsumDigest(&test));
                }
                
            }
        }
    }
    /* if we completed OK */
    if (result==RS_DONE) {
        /* if we reached eof, we can flush the last fragment */
        if (job->stream->eof_in) {
            job->statefn=rs_delta_s_flush;
            return RS_RUNNING;
        } else {
            /* we are blocked waiting for more data */
            return RS_BLOCKED;
        }
    }
    return result;
}


static rs_result rs_delta_s_flush(rs_job_t *job)
{
    rs_long_t      match_pos;
    size_t         match_len;
    rs_result      result;

    rs_job_check(job);
    /* read the input into the scoop */
    rs_getinput(job);
    /* output any pending output */
    result=rs_tube_catchup(job);
    /* while output is not blocked and there is any remaining data */
    while ((result==RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
        /* check if this block matches */
        if (rs_findmatch(job,&match_pos,&match_len)) {
            /* append the match and reset the weak_sum */
            result=rs_appendmatch(job,match_pos,match_len);
            RollsumInit(&job->weak_sum);
        } else {
            /* rollout from weak_sum and append the miss byte */
            RollsumRollout(&job->weak_sum,job->scoop_next[job->scoop_pos]);
            rs_trace("block reduced to %d", (int)job->weak_sum.count);
            result=rs_appendmiss(job,1);
        }
    }
    /* if we are not blocked, flush and set end statefn. */
    if (result==RS_DONE) {
        result=rs_appendflush(job);
        job->statefn=rs_delta_s_end;
    }
    if (result==RS_DONE) {
        return RS_RUNNING;
    }
    return result;
}


static rs_result rs_delta_s_end(rs_job_t *job)
{
    rs_emit_end_cmd(job);
    return RS_DONE;
}


void rs_getinput(rs_job_t *job) {
    size_t len;
    
    len=rs_scoop_total_avail(job);
    if (job->scoop_avail < len) {
        rs_scoop_input(job,len);
    }
}

        
/**
 * find a match at scoop_pos, returning the match_pos and match_len.
 * Note that this will calculate weak_sum if required. It will also
 * determine the match_len.
 * 
 * Note that this routine could be modified to do xdelta style matches that
 * would extend matches past block boundaries by matching backwards and 
 * forwards beyond the block boundaries. Extending backwards would require
 * decrementing scoop_pos as appropriate.
 */
inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len) {
    /* calculate the weak_sum if we don't have one */
    if (job->weak_sum.count == 0) {
        /* set match_len to min(block_len, scan_avail) */
        *match_len=job->scoop_avail - job->scoop_pos;
        if (*match_len > job->block_len) {
            *match_len = job->block_len;
        }
        /* Update the weak_sum */
        RollsumUpdate(&job->weak_sum,job->scoop_next+job->scoop_pos,*match_len);
        rs_trace("calculate weak sum from scratch length %d",(int)job->weak_sum.count);
    } else {
        /* set the match_len to the weak_sum count */
        *match_len=job->weak_sum.count;
    }
    return rs_search_for_block(RollsumDigest(&job->weak_sum),
                               job->scoop_next+job->scoop_pos,
                               *match_len,
                               job->signature,
                               &job->stats,
                               match_pos);
}


/**
 * Append a match at match_pos of length match_len to the delta, extending
 * a previous match if possible, or flushing any previous miss/match. */
inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
{
    rs_result result=RS_DONE;
    
    /* if last was a match that can be extended, extend it */
    if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
        job->basis_len+=match_len;
    } else {
        /* else appendflush the last value */
        result=rs_appendflush(job);
        /* make this the new match value */
        job->basis_pos=match_pos;
        job->basis_len=match_len;
    }
    /* increment scoop_pos to point at next unscanned data */
    job->scoop_pos+=match_len;
    /* we can only process from the scoop if output is not blocked */
    if (result==RS_DONE) {
        /* process the match data off the scoop*/
        result=rs_processmatch(job);
    }
    return result;
}


/**
 * Append a miss of length miss_len to the delta, extending a previous miss
 * if possible, or flushing any previous match. 
 * 
 * This also breaks misses up into block_len segments to avoid accumulating
 * too much in memory. */
inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
{
    rs_result result=RS_DONE;
    
    /* if last was a match, or block_len misses, appendflush it */
    if (job->basis_len || (job->scoop_pos >= rs_outbuflen)) {
        result=rs_appendflush(job);
    }
    /* increment scoop_pos */
    job->scoop_pos+=miss_len;
    return result;
}


/**
 * Flush any accumulating hit or miss, appending it to the delta.
 */
inline rs_result rs_appendflush(rs_job_t *job)
{
    /* if last is a match, emit it and reset last by resetting basis_len */
    if (job->basis_len) {
        rs_trace("matched " PRINTF_FORMAT_U64 " bytes at " PRINTF_FORMAT_U64 "!",
                 PRINTF_CAST_U64(job->basis_len),
                 PRINTF_CAST_U64(job->basis_pos));
        rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
        job->basis_len=0;
        return rs_processmatch(job);
    /* else if last is a miss, emit and process it*/
    } else if (job->scoop_pos) {
        rs_trace("got %ld bytes of literal data", (long) job->scoop_pos);
        rs_emit_literal_cmd(job, job->scoop_pos);
        return rs_processmiss(job);
    }
    /* otherwise, nothing to flush so we are done */
    return RS_DONE;
}


/**
 * The scoop contains match data at scoop_next of length scoop_pos. This
 * function processes that match data, returning RS_DONE if it completes,
 * or RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset
 * to still point at the next unscanned data.
 * 
 * This function currently just removes data from the scoop and adjusts 
 * scoop_pos appropriately. In the future this could be used for something
 * like context compressing of miss data. Note that it also calls
 * rs_tube_catchup to output any pending output. */
inline rs_result rs_processmatch(rs_job_t *job)
{
    job->scoop_avail-=job->scoop_pos;
    job->scoop_next+=job->scoop_pos;
    job->scoop_pos=0;
    return rs_tube_catchup(job);
}
    
/**
 * The scoop contains miss data at scoop_next of length scoop_pos. This
 * function processes that miss data, returning RS_DONE if it completes, or
 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
 * still point at the next unscanned data.
 * 
 * This function uses rs_tube_copy to queue copying from the scoop into
 * output. and uses rs_tube_catchup to do the copying. This automaticly
 * removes data from the scoop, but this can block. While rs_tube_catchup
 * is blocked, scoop_pos does not point at legit data, so scanning can also
 * not proceed.
 * 
 * In the future this could do compression of miss data before outputing
 * it. */
inline rs_result rs_processmiss(rs_job_t *job)
{
    rs_tube_copy(job, job->scoop_pos);
    job->scoop_pos=0;
    return rs_tube_catchup(job);
}


/**
 * \brief State function that does a slack delta containing only
 * literal data to recreate the input.
 */
static rs_result rs_delta_s_slack(rs_job_t *job)
{
    rs_buffers_t * const stream = job->stream;
    size_t avail = stream->avail_in;

    if (avail) {
        rs_trace("emit slack delta for " PRINTF_FORMAT_U64
                 " available bytes", PRINTF_CAST_U64(avail));
        rs_emit_literal_cmd(job, avail);
        rs_tube_copy(job, avail);
        return RS_RUNNING;
    } else {
        if (rs_job_input_is_ending(job)) {
            job->statefn = rs_delta_s_end;
            return RS_RUNNING;
        } else {                
            return RS_BLOCKED;
        }
    }
}


/**
 * State function for writing out the header of the encoding job.
 */
static rs_result rs_delta_s_header(rs_job_t *job)
{
    rs_emit_delta_header(job);

    if (job->block_len) {
        if (!job->signature) {
            rs_error("no signature is loaded into the job");
            return RS_PARAM_ERROR;
        }
        job->statefn = rs_delta_s_scan;
    } else {
        rs_trace("block length is zero for this delta; "
                 "therefore using slack deltas");
        job->statefn = rs_delta_s_slack;
    }

    return RS_RUNNING;
}


/**
 * Prepare to compute a streaming delta.
 */
rs_job_t *rs_delta_begin(rs_signature_t *sig)
{
    rs_job_t *job;

    job = rs_job_new("delta", rs_delta_s_header);
    job->signature = sig;

    RollsumInit(&job->weak_sum);

    if ((job->block_len = sig->block_len) < 0) {
        rs_log(RS_LOG_ERR, "unreasonable block_len %d in signature",
               job->block_len);
        return NULL;
    }

    job->strong_sum_len = sig->strong_sum_len;
    if (job->strong_sum_len < 0  ||  job->strong_sum_len > RS_MD4_LENGTH) {
        rs_log(RS_LOG_ERR, "unreasonable strong_sum_len %d in signature",
               job->strong_sum_len);
        return NULL;
    }

    return job;
}




syntax highlighted by Code2HTML, v. 0.9.1