#include #include #include #include #include #include #include #include #include #include #include #include #include #include "flow.h" #include "hash_ip.h" #include "hash_ip2.h" #include "fmt.h" /* /* XXX TODO /* flow_read_header needs to use readn() */ #define FSIZE ((FLOW_BUFS)*sizeof(struct flow_data)) /* /* function: flow_read /* /* flow_read_end must be called to dispose of /* resources used. /* /* flows are returned in host byte order /* */ struct flow_data *flow_read(fs) struct flow_stream *fs; { int n, err; extern int errno; /* compressed stream */ if (fs->flags & FS_FLAG_COMPRESS) { /* if first call, init zlib */ if (!(fs->flags & FS_FLAG_ZINIT)) { fs->zs.zalloc = (alloc_func)0; fs->zs.zfree = (free_func)0; fs->zs.opaque = (voidpf)0; if (inflateInit(&fs->zs) != Z_OK) { fprintf(stderr, "inflateInit(): failed\n"); return (struct flow_data*)0L; } fs->flags |= FS_FLAG_ZINIT; fs->zs.avail_out = sizeof (struct flow_data); fs->zs.next_out = (Bytef*)&fs->fdata[0]; } while (1) { /* XXX /* on last inflate, call inflateEnd and unset the ZINIT flag /* the check for read should probably go after the attempt /* to inflate /* /* on error, inconsistant state can be kept... */ /* /* if there's no data in the inflate buffer, do a read() */ if (!fs->zs.avail_in) { n = read(fs->fd, (char*)fs->z_buf, Z_BUFSIZE); if (!n) return (struct flow_data*)0L; /* XXX check for partial uncompressed flow */ if (n == -1) { fprintf(stderr,"read(): %s\n", strerror(errno)); return (struct flow_data*)0L; } fs->zs.next_in = (Bytef*)fs->z_buf; fs->zs.avail_in = n; } /* /* there's data in the compressed stream, inflate it */ err = inflate(&fs->zs, Z_PARTIAL_FLUSH); if ((err != Z_OK) && (err != Z_STREAM_END)) { fprintf(stderr, "inflate(): failed\n"); return (struct flow_data*)0L; } /* if this was a complete decompress, return the flow */ if (!fs->zs.avail_out) { fs->zs.avail_out = sizeof (struct flow_data); fs->zs.next_out = (Bytef*)&fs->fdata[0]; #if BYTE_ORDER == BIG_ENDIAN if (fs->fh.byte_order == FF_LITTLE_ENDIAN) swap_flow_data(&fs->fdata[0]); #endif /* BYTE_ORDER == BIG_ENDIAN */ #if BYTE_ORDER == LITTLE_ENDIAN if (fs->fh.byte_order == FF_BIG_ENDIAN) swap_flow_data(&fs->fdata[0]); #endif /* BYTE_ORDER == LITTLE_ENDIAN */ return &fs->fdata[0]; } } /* while 1 */ } /* compressed stream */ /* /* uncompressed stream */ if ((!fs->count) || (fs->next == fs->count)) { if (fs->leftover) { bcopy(&fs->fdata[fs->count], fs->fdata, fs->leftover); } n = read(fs->fd, (char*)fs->fdata+fs->leftover, (FLOW_BUFS-1)*sizeof(struct flow_data)); if ((n == -1) || (!n)) { if (fs->leftover) fprintf(stderr, "%d extra trailing bytes!\n", fs->leftover); return (struct flow_data*)0L; } n += fs->leftover; fs->count = n / sizeof(struct flow_data); fs->leftover = n % sizeof(struct flow_data); fs->next = 1; #if BYTE_ORDER == BIG_ENDIAN if (fs->fh.byte_order == FF_LITTLE_ENDIAN) swap_flow_data(&fs->fdata[0]); #endif /* BYTE_ORDER == BIG_ENDIAN */ #if BYTE_ORDER == LITTLE_ENDIAN if (fs->fh.byte_order == FF_BIG_ENDIAN) swap_flow_data(&fs->fdata[0]); #endif /* BYTE_ORDER == LITTLE_ENDIAN */ return &fs->fdata[0]; } #if BYTE_ORDER == BIG_ENDIAN if (fs->fh.byte_order == FF_LITTLE_ENDIAN) swap_flow_data(&fs->fdata[fs->next]); #endif /* BYTE_ORDER == BIG_ENDIAN */ #if BYTE_ORDER == LITTLE_ENDIAN if (fs->fh.byte_order == FF_BIG_ENDIAN) swap_flow_data(&fs->fdata[fs->next]); #endif /* BYTE_ORDER == LITTLE_ENDIAN */ return &fs->fdata[fs->next++]; } /* flow_read */ /* /* function: flow_read_end /* /* cleans up any resources used by flow_read */ int flow_read_end(fs) struct flow_stream *fs; { if (fs->flags & FS_FLAG_ZINIT) { inflateEnd(&fs->zs); fs->flags &= ~FS_FLAG_ZINIT; } return 0; } /* flow_read_end */ /* /* function: flow_write_end /* /* cleans up any resources used by flow_write */ int flow_write_end(fs) struct flow_stream *fs; { if (fs->flags & FS_FLAG_ZINIT) { deflateEnd(&fs->zs); fs->flags &= ~FS_FLAG_ZINIT; } return 0; } /* flow_write_end */ /* /* function: flow_write /* /* schedules the flow in fdata to be written to fd /* using the buffers in fs. fs must be zero'd /* before the inital call. If fs->flags has the /* FF_FLAG_COMPRESS bit set, then the flow is output /* in compressed format. /* /* flow_flush must be called after the last flow /* is issued to flow_write /* /* flow_write_end must be called to dispose of /* resources used. /* /* returns -1 for errors, else number of bytes written. /* */ int flow_write(fdata, fs) struct flow_stream *fs; struct flow_data *fdata; { int ret, nbytes; nbytes = 0; if (fs->flags & FS_FLAG_COMPRESS) { /* if first call, init zlib */ if (!(fs->flags & FS_FLAG_ZINIT)) { fs->zs.zalloc = (alloc_func)0; fs->zs.zfree = (free_func)0; fs->zs.opaque = (voidpf)0; if (deflateInit(&fs->zs, fs->z_level) != Z_OK) { fprintf(stderr, "deflateInit(): failed\n"); return -1; } fs->flags |= FS_FLAG_ZINIT; fs->zs.next_out = (Bytef*)&fs->z_buf; fs->zs.avail_out = Z_BUFSIZE; } fs->zs.next_in = (Bytef*)fdata; fs->zs.avail_in = sizeof(struct flow_data); while (1) { if (deflate(&fs->zs, Z_NO_FLUSH) != Z_OK) { fprintf(stderr, "deflate(): failed\n"); return -1; } /* need to flush */ if (!fs->zs.avail_out) { ret = writen(fs->fd, fs->z_buf, Z_BUFSIZE); nbytes += ret; if (ret == -1) return -1; fs->zs.next_out = (Bytef*)&fs->z_buf; fs->zs.avail_out = Z_BUFSIZE; } else break; } /* deflating */ } else { /* compressed */ /* /* not compressed */ bcopy(fdata, &fs->fdata[fs->count], sizeof (struct flow_data)); /* if the buffer if full, flush it */ if ((++ fs->count) == FLOW_BUFS) { fs->count = 0; ret = writen(fs->fd, fs->fdata, FSIZE); nbytes += ret; if (ret == -1) return -1; } } /* not compressed */ return nbytes; } /* flow_write */ /* /* function: flow_flush /* /* flushes the remaining data in fs to fd. /* for use after flow_write() /* /* it's safe to call this between flow_write()'s if the /* output is not compressed. It's not safe to call /* except when done writing if the stream is compressed. /* /* returns -1 for errors, else number of bytes written. */ int flow_flush(fs) struct flow_stream *fs; { int ret, err, nbytes;; nbytes = 0; if ((fs->flags & FS_FLAG_COMPRESS) && (fs->flags & FS_FLAG_ZINIT)) { fs->zs.avail_in = 0; while (1) { err = deflate(&fs->zs, Z_FINISH); /* if done compressing, do final write to disk */ if (err == Z_STREAM_END) break; /* if anything other than Z_OK, then it's an error */ if (err != Z_OK) { fprintf(stderr, "deflate(): failed\n"); return -1; } /* need to flush */ if (!fs->zs.avail_out) { ret = writen(fs->fd, fs->z_buf, Z_BUFSIZE); nbytes += ret; if (ret == -1) return -1; fs->zs.next_out = (Bytef*)&fs->z_buf; fs->zs.avail_out = Z_BUFSIZE; } else break; } /* while 1 */ ret = writen(fs->fd, &fs->z_buf, Z_BUFSIZE - fs->zs.avail_out); if (ret == -1) return ret; nbytes += ret; return nbytes; } else { ret = writen(fs->fd, &fs->fdata, fs->count*sizeof (struct flow_data)); fs->count = 0; if (ret == -1) return ret; nbytes += ret; return nbytes; } return 0; } /* flow_flush */ /* /* function: flow_read_header /* /* read a flow_header struct into /* the flow stream struct. /* /* the structure is returned in host byte order /* /* the byte_order element reflects the byte order of how the flows /* are stored. */ int flow_read_header(fs) struct flow_stream *fs; { int n, i; extern int errno; i = 0; while (1) { if ((n = read(fs->fd, (char*)&fs->fh+i, sizeof (struct flow_header))) == -1) { fprintf(stderr, "read(): %s\n", strerror(errno)); return -1; } if (n && (n != sizeof (struct flow_header))) { i += n; continue; } break; } if (n != sizeof (struct flow_header)) { fprintf(stderr, "Bad header size.\n"); return -1; } if ((fs->fh.magic1 != FF_MAGIC1) || (fs->fh.magic2 != FF_MAGIC2)) { fprintf(stderr, "Bad magic number.\n"); return -1; } if (fs->fh.s_version != 1) { fprintf(stderr, "Bad flow stream version (%d), expecting %d\n", (int)fs->fh.s_version, (int)FSTREAM_VERSION); } #if BYTE_ORDER == BIG_ENDIAN if (fs->fh.byte_order == FF_LITTLE_ENDIAN) { swap_flow_header(&fs->fh); fs->fh.byte_order = FF_LITTLE_ENDIAN; } #endif /* BYTE_ORDER == BIG_ENDIAN */ #if BYTE_ORDER == LITTLE_ENDIAN if (fs->fh.byte_order == FF_BIG_ENDIAN) { swap_flow_header(&fs->fh); fs->fh.byte_order = FF_BIG_ENDIAN; } #endif /* BYTE_ORDER == LITTLE_ENDIAN */ if ((fs->fh.d_version != 1) && (fs->fh.d_version != 5) && (fs->fh.d_version != FF_D_VERSION_UNKNOWN)) { fprintf(stderr, "Bad export format, expecting 1 or 5 got %u.\n", (u_int)fs->fh.d_version); return -1; } if (fs->fh.flags & FF_FLAG_COMPRESS) fs->flags |= FF_FLAG_COMPRESS; return 0; } /* flow_read_header */ /* From Stevens */ /* /* function: writen /* /* write()'s n bytes to fd. /* returns # of bytes written, or -1 for error */ int writen(fd, ptr, nbytes) register int fd; register char *ptr; register int nbytes; { int nleft, nwritten; nleft = nbytes; while (nleft > 0) { nwritten = write(fd, ptr, nleft); if (nwritten <= 0) return(nwritten); /* error */ nleft -= nwritten; ptr += nwritten; } return(nbytes - nleft); } /* writen */ /* /* function: readn /* /* read()'s n bytes from fd /* returns # of butes read, or -1 for error */ int readn(fd, ptr, nbytes) register int fd; register char *ptr; register int nbytes; { int nleft, nread; nleft = nbytes; while (nleft > 0) { nread = read(fd, ptr, nleft); if (nread < 0) return nread; else if (nread == 0) break; nleft -= nread; ptr += nread; } return (nbytes - nleft); } /* readn */ /* /* function: flow_print_header /* /* output's a flow_header struct to stdout in readable form */ int flow_print_header(fh, cc) struct flow_header *fh; char cc; { printf("%c\n%c capture hostname: %s\n", cc, cc, fh->hostname); printf("%c capture start: %s", cc, ctime((time_t*)&fh->start)); printf("%c compress: %s\n", cc, (fh->flags & FF_FLAG_COMPRESS) ? "on" : "off"); printf("%c byte order: ", cc); if (fh->byte_order == FF_LITTLE_ENDIAN) printf("little\n"); else if (fh->byte_order == FF_BIG_ENDIAN) printf("big\n"); printf("%c multiple pdu types: %s\n", cc, (fh->flags & FF_FLAG_MULT_PDU) ? "yes" : "no"); printf("%c stream version: %u\n", cc, (int)fh->s_version); printf("%c export version: %u\n", cc, (int)fh->d_version); printf("%c pdu drops : %lu\n", cc, (u_long)fh->pdu_drops); printf("%c pdu misordered: %lu\n", cc, (u_long)fh->pdu_misordered); if (fh->comments[0]) printf("%c comments: %s\n", cc, fh->comments); if (fh->flags & FF_FLAG_DONE) { printf("%c capture end: %s", cc, ctime((time_t*)&fh->end)); printf("%c capture nflows: %lu\n", cc, fh->nflows); } else printf("%c note, incomplete flow file\n", cc); printf("%c\n", cc); return 0; } /* flow_print_header */ /* /* function: profile_start /* /* initializes profile stats /* */ int profile_start(fp) struct flow_profile *fp; { bzero (fp, sizeof (struct flow_profile)); return gettimeofday(&fp->t0, (struct timezone*)0L); } /* profile_start */ /* /* function: profile_end /* /* prints timing stats to stdout. call profile_start first /* flow_profile.nflows must be set by caller */ int profile_end(prog, fp) struct flow_profile *fp; char *prog; { int ret; char fmt_buf[256]; u_long usec, sec; double fps; if ((ret = gettimeofday(&fp->t1, (struct timezone*)0L)) == -1) return -1; if ((ret = getrusage(RUSAGE_SELF, &fp->r0)) == -1) return -1; fmt_uint64(fmt_buf, fp->nflows, FMT_JUST_LEFT); usec = fp->r0.ru_utime.tv_usec + fp->r0.ru_stime.tv_usec; sec = fp->r0.ru_utime.tv_sec + fp->r0.ru_stime.tv_sec; if (usec > 1000000) usec -= 1000000, ++sec; fps = (double)fp->nflows / ((double)sec + ((double)usec/1000000)); fprintf(stderr, "%s: processed %s flows\n", prog, fmt_buf); fprintf(stderr, " sys: seconds=%lu.%-3.3lu flows/second=%f\n", sec, usec/1000, fps); if (fp->t1.tv_usec < fp->t0.tv_usec) fp->t1.tv_usec += 1000000, --fp->t1.tv_sec; usec = fp->t1.tv_usec - fp->t0.tv_usec; sec = fp->t1.tv_sec - fp->t0.tv_sec; fps = (double)fp->nflows / ((double)sec + ((double)usec/1000000)); fprintf(stderr, " wall: seconds=%lu.%-3.3lu flows/second=%f\n", sec, usec/1000, fps); return 0; } /* profile_end */ /* /* swap the byte order of a flow_pdu */ void swap_flow_pdu_v1(fpdu, byte_order) struct flow_pdu_v1 *fpdu; int byte_order; { int16 x; x = fpdu->count; #if BYTE_ORDER == LITTLE_ENDIAN if (byte_order == BIG_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == LITTLE_ENDIAN */ #if BYTE_ORDER == BIG_ENDIAN if (byte_order == LITTLE_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == BIG_ENDIAN */ SWAPINT16(fpdu->version); SWAPINT16(fpdu->count); SWAPINT32(fpdu->sysUpTime); SWAPINT32(fpdu->unix_secs); SWAPINT32(fpdu->unix_nsecs); for (--x; x >= 0; --x) { SWAPINT32(fpdu->records[x].srcaddr) SWAPINT32(fpdu->records[x].dstaddr) SWAPINT32(fpdu->records[x].nexthop) SWAPINT16(fpdu->records[x].input) SWAPINT16(fpdu->records[x].output) SWAPINT32(fpdu->records[x].dPkts) SWAPINT32(fpdu->records[x].dOctets) SWAPINT32(fpdu->records[x].First) SWAPINT32(fpdu->records[x].Last) SWAPINT16(fpdu->records[x].dstport) SWAPINT16(fpdu->records[x].srcport) } } /* swap_flow_pdu_v1 */ /* /* swap the byte order of a flow_pdu */ void swap_flow_pdu_v5(fpdu, byte_order) struct flow_pdu_v5 *fpdu; int byte_order; { int16 x; x = fpdu->count; #if BYTE_ORDER == LITTLE_ENDIAN if (byte_order == BIG_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == LITTLE_ENDIAN */ #if BYTE_ORDER == BIG_ENDIAN if (byte_order == LITTLE_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == BIG_ENDIAN */ SWAPINT16(fpdu->version); SWAPINT16(fpdu->count); SWAPINT32(fpdu->sysUpTime); SWAPINT32(fpdu->unix_secs); SWAPINT32(fpdu->unix_nsecs); SWAPINT32(fpdu->flow_sequence); SWAPINT32(fpdu->reserved); for (--x; x >= 0; --x) { SWAPINT32(fpdu->records[x].srcaddr) SWAPINT32(fpdu->records[x].dstaddr) SWAPINT32(fpdu->records[x].nexthop) SWAPINT16(fpdu->records[x].input) SWAPINT16(fpdu->records[x].output) SWAPINT32(fpdu->records[x].dPkts) SWAPINT32(fpdu->records[x].dOctets) SWAPINT32(fpdu->records[x].First) SWAPINT32(fpdu->records[x].Last) SWAPINT16(fpdu->records[x].dstport) SWAPINT16(fpdu->records[x].srcport) SWAPINT16(fpdu->records[x].src_as) SWAPINT16(fpdu->records[x].dst_as) SWAPINT16(fpdu->records[x].drops) } } /* swap_flow_pdu_v5 */ /* /* swap the byte order of a flow_header */ void swap_flow_header(fh) struct flow_header *fh; { if (fh->byte_order == FF_LITTLE_ENDIAN) fh->byte_order = FF_BIG_ENDIAN; else fh->byte_order = FF_LITTLE_ENDIAN; SWAPINT16(fh->d_version); SWAPINT32(fh->start); SWAPINT32(fh->end); SWAPINT32(fh->flags); SWAPINT32(fh->rotation); SWAPINT32(fh->nflows); } /* /* swap the byte order of a flow_data */ void swap_flow_data(fdata) struct flow_data *fdata; { SWAPINT32(fdata->unix_secs); SWAPINT32(fdata->unix_msecs); SWAPINT32(fdata->srcaddr); SWAPINT32(fdata->dstaddr); SWAPINT32(fdata->nexthop); SWAPINT16(fdata->input); SWAPINT16(fdata->output); SWAPINT32(fdata->dPkts); SWAPINT32(fdata->dOctets); SWAPINT32(fdata->First); SWAPINT32(fdata->Last); SWAPINT16(fdata->srcport); SWAPINT16(fdata->dstport); SWAPINT16(fdata->pad); SWAPINT32(fdata->src_as); SWAPINT32(fdata->dst_as); SWAPINT32(fdata->drops); } /* swap_flow_data */ /* /* swap the byte order of a flow_pdu, but only the header */ void swap_flow_pdu2_v1(fpdu, byte_order) struct flow_pdu_v1 *fpdu; int byte_order; { int16 x; x = fpdu->count; #if BYTE_ORDER == LITTLE_ENDIAN if (byte_order == BIG_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == LITTLE_ENDIAN */ #if BYTE_ORDER == BIG_ENDIAN if (byte_order == LITTLE_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == BIG_ENDIAN */ SWAPINT16(fpdu->version); SWAPINT16(fpdu->count); SWAPINT32(fpdu->sysUpTime); SWAPINT32(fpdu->unix_secs); SWAPINT32(fpdu->unix_nsecs); } /* swap_flow_pdu2_v1 */ /* /* swap the byte order of a flow_pdu, but only the header */ void swap_flow_pdu2_v5(fpdu, byte_order) struct flow_pdu_v5 *fpdu; int byte_order; { int16 x; x = fpdu->count; #if BYTE_ORDER == LITTLE_ENDIAN if (byte_order == BIG_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == LITTLE_ENDIAN */ #if BYTE_ORDER == BIG_ENDIAN if (byte_order == LITTLE_ENDIAN) { SWAPINT16(x); } #endif /* BYTE_ORDER == BIG_ENDIAN */ SWAPINT16(fpdu->version); SWAPINT16(fpdu->count); SWAPINT32(fpdu->sysUpTime); SWAPINT32(fpdu->unix_secs); SWAPINT32(fpdu->unix_nsecs); SWAPINT32(fpdu->flow_sequence); SWAPINT32(fpdu->reserved); } /* swap_flow_pdu2_v5 */