#!/usr/bin/python # file tailer/colorer. # # by matt hellige (matt@immute.net) # # this utility is derived in part from colortail-0.3.0 by joakim andersson # (pt98jan@student.hk-r.se). it is an improvement on that work in several # areas: # - it is roughly 500 lines of code, rather than roughly 3500. # - it does not dump core when it encounters long lines. # - the regexp matching process is simpler and, in my opinion, more intuitive. # - there is only one config file, which contains coloring rules for an # arbitrary number of files. # - my coloring algorithm only generates escape codes when the color actually # needs changing, rather than around every single character. # - by virtue of being written in python, it is less platform-dependent. # - i support pre-coloring filters. # please see the file LICENSE for copyright info, etc... share and enjoy! import getopt import string import time import re import sys import stat import os import threading import signal # config file location. this should be made platform-independent... if os.environ.has_key('HOME'): config_file = os.environ['HOME']+os.sep+'.mtailrc' else: config_file = None # version number version_number = "1.1.1" # stuff to make sure that we print banners correctly when tailing multiple # files. we need to lock around the printing of each line to guarantee that # each line gets a banner when necessary. lock = threading.Lock() event = threading.Event() last_filename = None multiple = 0 verbose = 0 silent = 0 active = 0 # sets up the interrupt handler, calls processargs(), sets up and starts the # tailers. def main(): global multiple global active print signal.signal(signal.SIGINT, handleInterrupt) initial_lines, filenames, follow = processargs() config = ConfigFile(config_file) config.loadconfig() if len(filenames) > 1: multiple = 1 for filename in filenames: lock.acquire() active = active + 1 TailerThread(initial_lines, filename, follow, config).start() lock.release() while 1: # it seems that there just needs to be some number here, or it ignores # signals. PYTHON BUG! event.wait(10000) if event.isSet(): sys.exit(0) # signal handler for keyboard interrupts... just exits. def handleInterrupt(num, frame): sys.exit(0) # a simple thread that gets a config for a filename, initializes a tailer, # and loops. class TailerThread(threading.Thread): def __init__(self, initial_lines, filename, follow, config): threading.Thread.__init__(self) self.setDaemon(1) self.initial_lines = initial_lines self.filename = filename self.follow = follow self.config = config def run(self): global active try: self.do_run() except Exception: pass lock.acquire() active = active - 1 if active == 0: event.set() lock.release() def do_run(self): global last_filename if self.filename != None: ansi, colors, filter = self.config.getconfig(self.filename[string\ .rfind(self.filename, os.sep)+1:]) tailer = TailFile(self.filename, self.follow) tailer.seek_lines(self.initial_lines) else: if not multiple and not silent: print '*** reading stdin, will just follow...' print self.filename = 'standard input' ansi, colors, filter = self.config.getstdinconfig() tailer = sys.stdin while 1: line = tailer.readline() if not line: break if line[len(line)-1] == '\n': line = line[:len(line)-1] line = filter(line) lock.acquire() try: if (multiple or verbose) and (not silent) \ and last_filename != self.filename: last_filename = self.filename print print colorize(ansi, colors, '==> %s <==' % self.filename) print colorize(ansi, colors, line) except Exception: print ansi['reset'] sys.exit(1) sys.stdout.flush() lock.release() # processes the command line arguments. def processargs(): global verbose global silent global config_file filenames = [] lines = None follow = 0 try: opts, pargs = getopt.getopt(sys.argv[1:], 'fn:?hvq', ["follow", "lines=", "help", "verbose", "quiet", "silent", "config="]) except getopt.error: print 'unrecognized option!' print usage() sys.exit(1) for opt in opts: if opt[0] in ['-n', '--lines']: lines = opt[1] elif opt[0] in ['-f', '--follow']: follow = 1 elif opt[0] in ['-?', '-h', '--help']: usage() sys.exit(1) elif opt[0] in ['-v', '--verbose']: verbose = 1 elif opt[0] in ['-q', '--quiet', '--silent']: silent = 1 elif opt[0] in ['--config']: config_file = opt[1] if lines == None: lines = 10 for filename in pargs: if filename == '-': filenames.append(None) else: filenames.append(filename) if len(filenames) == 0: filenames.append(None) return (int(lines), filenames, follow) # prints a usage message. def usage(): global version_number print 'mtail '+version_number+' by matt hellige (matt@immute.net)' print print 'usage: '+sys.argv[0]+' [option]... []...' print ' -?, -h, --help print this usage info and exit' print ' -f, --follow output appended data as the file grows' print ' -n, --lines=N output the last N lines, instead of the last 10' print ' -v, --verbose always output headers giving file names' print ' -q, --quiet, --silent never output headers giving file names' print ' --config=FILE use config file FILE instead of ~/.mtailrc' print print 'with more than one filename, mtail will precede each chunk from' print 'each with a header giving the file name. if no filenames are given' print 'or a filename is -, standard input is used, -f is automatically' print 'set, and -n is ignored (if specified).' print print 'this version does not support arguments of the form -N as synonyms' print 'for -nN, nor does it support -n +N (or --lines +N). -f always' print 'follows a particular file descriptor rather than a file name, i.e.' print 'this version does not support the --follow=name behavior of gnu' print 'tail.' print # colorizes a line, returning a string suitable for printing to a # terminal. def colorize(ansi, colors, line): if line == "": return line colarray = len(line)*[None] # match each regexp in turn, and paint in the color array in the right # place(s)... for colpair in colors: pos = 0 m = colpair[0].search(line, pos) while m is not None: if len(m.groups()) > 0: colarray[m.start(1):m.end(1)] = len(m.group(1))*[colpair[1]] else: colarray[m.start():m.end()] = len(m.group())*[colpair[1]] pos = m.end() m = colpair[0].search(line, pos) colline = "" curcolor = None i = 0 # now draw the line with colors... while i < len(line): if colarray[i] != curcolor: if curcolor is not None: colline = colline+ansi["reset"] curcolor = colarray[i] if curcolor is not None: colline = colline+ansi[curcolor] colline = colline+line[i] i = i + 1 if curcolor is not None: colline = colline+ansi["reset"] return colline # this class provides a readline() method which reads a complete line, # even if it has to block and poll the file for awhile to do so. # it also provides a seek_file(n) method which places the file pointer # n lines from the end of the file. # # i'm not going to document it any further... it's pretty nasty, but # straightforward. class TailFile: position = 0 filename = None file = None follow = 0 readbuf = "" def __init__(self, filename, follow = 0): self.filename = filename if not os.path.exists(filename): print "no such file: '"+filename+"'." sys.exit(1) if os.path.isdir(filename): print "file '"+filename+"' is a directory." sys.exit(1) if not os.access(filename, os.R_OK): print "cannot read file '"+filename+"'. permission denied." sys.exit(1) self.file = open(self.filename, 'r') self.follow = follow def readline(self): index = string.find(self.readbuf, '\n') while index == -1: try: n = self.more_to_read() except Exception: return None if n == 0: if self.follow: time.sleep(1) else: return None else: self.readbuf = self.readbuf + self.file.read(n) index = string.find(self.readbuf, '\n') line = self.readbuf[:index+1] self.readbuf = self.readbuf[index+1:] return line def seek_lines(self, lines): bufsize = 2048 bytestoread = 2048 self.file.seek(0, 2) if lines == 0: return pos = self.file.tell() while pos != 0: pos = pos - bufsize if pos < 0: bytestoread = bytestoread + pos pos = 0 self.file.seek(pos) buf = self.file.read(bytestoread) j = bytestoread-1 while j >= 0: if buf[j] == '\n': lines = lines - 1 if lines < 0: self.file.seek(pos+j+1) return j = j - 1 self.file.seek(0) def end_of_file_position(self): return os.stat(self.filename)[stat.ST_SIZE] def more_to_read(self): end = self.end_of_file_position() if end < self.file.tell(): self.file.seek(end) return 0 return end - self.file.tell() # this class provides an abstraction of the mtail config file. it knows # how to read and parse a config file (view loadconfig()), and return the # appropriate configuration to use for a given filename (via getconfig()) # and for stdin (via getstdinconfig()). # # it's also pretty ugly... it might not be so bad if i hadn't written it at # three in the morning. class ConfigFile: # filename of our config file filename = None # configs with file regexps configs = [] # config for stdin stdinconfig = None # default config default = { 'colors': [], 'filters': [] } # these are the default ansi escape sequences we use... ansi = {} ansi["black"] = "\033[0;30m" ansi["red"] = "\033[0;31m" ansi["green"] = "\033[0;32m" ansi["yellow"] = "\033[0;33m" ansi["blue"] = "\033[0;34m" ansi["magenta"] = "\033[0;35m" ansi["cyan"] = "\033[0;36m" ansi["white"] = "\033[0;37m" ansi["brightblack"] = "\033[1;30m" ansi["brightred"] = "\033[1;31m" ansi["brightgreen"] = "\033[1;32m" ansi["brightyellow"] = "\033[1;33m" ansi["brightblue"] = "\033[1;34m" ansi["brightmagenta"] = "\033[1;35m" ansi["brightcyan"] = "\033[1;36m" ansi["brightwhite"] = "\033[1;37m" ansi["reset"] = "\033[0m" # create a new ConfigFile representation with the given filename. def __init__(self, filename): self.filename = filename # load all configs from the file, parse and store internally. def loadconfig(self): if self.filename == None or not (os.path.exists(self.filename) and os.path.isfile(self.filename) and os.access(self.filename, os.R_OK)): self.stdinconfig = self.default return config = None mode = None stdin = 0 default = 0 nonstdin = 0 lineno = 0 file = open(self.filename, "r") for line in file.readlines(): lineno = lineno+1 line = string.strip(line) if len(line) > 0 and line[0] != '#': if string.find(line, "files:") == 0: if config is not None: if nonstdin: self.configs.append(config) if stdin and self.stdinconfig is None: self.stdinconfig = config if default: self.default = config stdin = 0 nonstdin = 0 default = 0 filesre = None files = string.strip(line[6:]) while len(files) > 0: if string.find(files, 'stdin') == 0: stdin = 1 files = string.lstrip(files[5:]) elif string.find(files, 'default') == 0: default = 1 files = string.lstrip(files[7:]) else: sep = files[0] if (string.find(string.digits, sep) != -1 or string.find(string.letters, sep) != -1): print "configuration error! invalid regexp marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return reend = string.index(files, sep, 1) if reend == -1: print "configuration error! missing regexp end marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return rexp = files[1:reend] rexp = string.replace(rexp, '\\'+sep, sep) if filesre is None: filesre = '('+rexp+')' else: filesre = filesre+'|('+rexp+')' files = string.lstrip(files[reend+1:]) config = {} if filesre is not None: nonstdin = 1 config['files'] = re.compile(filesre) config['colors'] = [] config['filters'] = [] mode = None elif string.find(line, 'ansi:') == 0: if config is not None: if nonstdin: self.configs.append(config) if stdin and self.stdinconfig is None: self.stdinconfig = config if default: self.default = config mode = 'ansi' config = None elif string.find(line, 'colors:') == 0: if config is None: print "configuration error! 'colors:' found outside of 'files:' block at line " +`lineno`+ "." print "using no config!" return mode = 'colors' elif string.find(line, 'filters:') == 0: if config is None: print "configuration error! 'filters:' found outside of 'files:' block at line " +`lineno`+ "." print "using no config!" self.configs = [] return mode = 'filters' else: if mode == 'colors': sep = line[0] if (string.find(string.digits, sep) != -1 or string.find(string.letters, sep) != -1): print "configuration error! invalid regexp marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return reend = string.index(line, sep, 1) if reend == -1: print "configuration error! missing regexp end marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return rexp = line[1:reend] rexp = string.replace(rexp, '\\'+sep, sep) color = string.strip(line[reend+1:]) if len(color)==0: print "configuration error! missing color at line " +`lineno`+ "." print "using no config!" self.configs = [] return config['colors'].append((re.compile(rexp), color)) elif mode == 'filters': sep = line[0] if (string.find(string.digits, sep) != -1 or string.find(string.letters, sep) != -1): print "configuration error! invalid regexp marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return subend = string.index(line, sep, 1) if subend == -1: print "configuration error! missing second filter marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return sub = line[1:subend] sub = string.replace(sub, '\\'+sep, sep) replend = string.index(line, sep, subend+1) if subend == -1: print "configuration error! missing final filter marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return repl = line[subend+1:replend] repl = string.replace(repl, '\\'+sep, sep) config['filters'].append((re.compile(sub), repl)) elif mode == 'ansi': sep = line[0] if (string.find(string.digits, sep) != -1 or string.find(string.letters, sep) != -1): print "configuration error! invalid regexp marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return nameend = string.index(line, sep, 1) if nameend == -1: print "configuration error! missing second ansi marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return name = line[1:nameend] name = string.replace(name, '\\'+sep, sep) seqend = string.index(line, sep, nameend+1) if seqend == -1: print "configuration error! missing final ansi marker at line " +`lineno`+ "." print "using no config!" self.configs = [] return seq = line[nameend+1:seqend] seq = string.replace(seq, '\\'+sep, sep) self.ansi[name] = seq else: print "configuration error! this line doesn't belong here! at line " +`lineno`+ "." print "using no config!" self.configs = [] return if config is not None: if nonstdin: self.configs.append(config) if stdin and self.stdinconfig is None: self.stdinconfig = config if default: self.default = config if self.stdinconfig is None: self.stdinconfig = self.default self.checkconfigs() # check the configs and remove unrecognized colors. def checkconfigs(self): for config in self.configs + [self.stdinconfig, self.default]: for color in config['colors']: if not self.ansi.has_key(color[1]): config['colors'].remove(color) # get the config to use when coloring stdin def getstdinconfig(self): return (self.ansi, self.stdinconfig['colors'], lambda x, f=self.stdinconfig['filters']: reduce(lambda y, z, f=f: re.sub(z[0],z[1],y), f, x)) # get the config to use when coloring the file filename. filename should # not be a full pathname. def getconfig(self, filename): colors = self.default['colors'] filters = self.default['filters'] for config in self.configs: if config['files'].match(filename): colors = config['colors'] filters = config['filters'] break return (self.ansi, colors, lambda x, f=filters: reduce(lambda y, z, f=f: z[0].sub(z[1],y), f, x)) # now we'll call main()... main()