#!/usr/bin/perl # use IO::Socket; use IO::Select; # use Net::protoent; use Net::hostent; use Net::servent; # use File::Basename; use Time::Local; use Sys::Syslog; use POSIX ":sys_wait_h"; use Cflow qw(:flowvars :tcpflags :icmptypes :icmpcodes 1.041); # # %view = (); $main::view_definitions="/usr/local/etc/view_definitions.pl"; # $main::expire_tstamp=0; $main::time_window=3600; # данные хранятся только за последний час. $main::reconfig_requested=1; #$main::step = 300 ; $main::step = 60; # шаг шкалы времени. Каждый отсчёт - одна минута. # $main::prev_file=""; $main::prev_processed_file=""; $main::prev_mtime=time(); # $main::selected_view=""; $main::selected_class=""; $main::selected_class_buf=""; # $main::history_mode=0; # режим просмотра истории. =1 для потомков, форкнутых по # команде HISTORY MODE # #$main::daemon=0; # используется для отладки $main::daemon=1; $main::version = "_v0.4"; $main::my_name="flow_Server".$main::version; $main::my_addr="0.0.0.0"; $main::my_port=9009; $main::pidfile="/var/run/$my_name.pid"; # @hosts_files = ("/etc/hosts"); @protocols_files = ("/etc/protocols"); @services_files = ("/etc/services"); @asns_files = ("/usr/local/share/flow-tools/asn"); @typecode = ( ### ICMP typecodes [ 'echoreply' ], # 0 /* echo reply */ undef, # 1 undef, # 2 [ 'unreach', # 3 /* dest unreachable, codes: */ ['net_', # 0 /* bad net */ 'host_', # 1 /* bad host */ 'protocol_', # 2 /* bad protocol */ 'port_', # 3 /* bad port */ 'needfrag_', # 4 /* IP_DF caused drop */ 'srcfail_', # 5 /* src route failed */ 'net_unknown_', # 6 'host_unknown_', # 7 'host_isolated_', # 8 'net_ano_', # 9 'host_ano_', # 10 'net_unr_tos_', # 11 'host_unr_tos_', # 12 'pkt_filtered_', # 13 /* Packet filtered */ 'prec_violation_', # 14 /* Precedence violation */ 'prec_cutoff_', # 15 /* Precedence cut off */ ] ], [ 'source_quench' ], # 4 /* packet lost, slow down */ [ 'redirect', # 5 /* shorter route, codes: */ ['net_', # 0 /* for network */ 'host_', # 1 /* for host */ 'tosnet_', # 2 /* for tos and net */ 'toshost_'] ], # 3 /* for tos and host */ undef, # 6 undef, # 7 [ 'echo' ], # 8 /* echo service */ undef, # 9 undef, # 10 [ 'time_exceeded', # 11 /* time exceeded, code: */ ['intrans_', # 0 /* ttl==0 in transit */ 'reass_'] ], # 1 /* ttl==0 in reass */ [ 'paramprob' ], # 12 /* ip header bad */ [ 'timestamp' ], # 13 /* timestamp request */ [ 'timestampreply' ], # 14 /* timestamp reply */ [ 'info_request' ], # 15 /* information request */ [ 'info_reply' ], # 16 /* information reply */ [ 'maskreq' ], # 17 /* address mask request */ [ 'maskreply' ], # 18 /* address mask reply */ ); #---------------------------------------------------------- $flowsdir = "/var/netflow"; # каталог, в котором хранятся данные NetFlow $file_check_interval=5; # secs, период проверки наличия нового файла с данными $last_file_check=0; # # $main::timestamp=""; # метка времени последнего обработанного файла с данными. # $rd_sel = IO::Select->new(); # битмаски дескрипторов для select $wr_sel = IO::Select->new(); $er_sel = IO::Select->new(); # %fh_rdbuf=(); # массив буферов для чтения команд %fh_wrbuf=(); # массив буферов для записи результатов исполнения # sub printTime{ my(@curr_time)=localtime(); my($datetime)=sprintf("%.4d-%.2d-%.2d %.2d:%.2d:%.2d",$curr_time[5]+1900,$curr_time[4]+1, $curr_time[3],$curr_time[2],$curr_time[1],$curr_time[0]); return $datetime ; } sub flog { if ($main::daemon) { syslog('info', @_) } else { $_= printTime(); print STDOUT "$_ "; print STDOUT @_ ; print STDOUT "\n"; } }; sub ferr { if ($main::daemon) { syslog('err', @_) } else { $_= printTime(); print STDERR "$_ "; print STDERR @_; print STDOUT "\n"; } }; sub fwarn { if ($main::daemon) { syslog('warn', @_) } else { &ferr(@_); } }; # #------------------------------------------------------- sub round { $float=shift; $intgr=int($float+0.5); return $intgr; } # sub get_icmp_typecode { my($dport)= shift; $lb = $dport - int($dport/256)*256 ; $hb = int($dport/256); if ( defined $typecode[$hb]->[1]->[$lb] ) { return $typecode[$hb]->[1]->[$lb].$typecode[$hb]->[0] } else { return $typecode[$hb]->[0] } } #------------------------------------------------------- # %hosts = (); %protocols = (); %udp_services = (); %tcp_services = (); %asns = (); # @tables = (); # sub LoadHosts { my($fname) = shift; if ( open( HOSTS, $fname ) ) { while () { if ( /^([0-9\.]+)\s+([^\s]+)/ ) { #print "Ip=$1 NAME=$2\n"; $hosts{$1}=$2; } } close HOSTS; } } # sub LoadProtocols { my($fname) = shift; if ( open( PROTO, $fname ) ) { while () { if ( /^([^#^\s]+)\s+([0-9^\s]+)/ ) { #print "ProtoName=$1 Num=$2\n"; $protocols{$2}=$1; } } close PROTO; } } # sub LoadServices { my($fname) = shift; if ( open( SRVS, $fname ) ) { while () { if ( /^([^#^\s]+)\s+([0-9^]+)\/tcp/ ) { #print "S=$1 N=$2 tcp\n"; $tcp_services{$2}=$1; } if ( /^([^#^\s]+)\s+([0-9^]+)\/udp/ ) { #print "S=$1 N=$2 udp\n"; $udp_services{$2}=$1; } } close SRVS; } } # sub LoadAsns { my($fname) = shift; if ( open( ASNS, $fname ) ) { while () { if ( /^([0-9]+)\s+([^\s]+)/ ) { #print "ASNAME=$2 AS_Num=$1\n"; $asns{$1}=$2; } } close ASNS; } } # sub UpdateTables { for( $i=0; $i <=$#tables; $i++) { $hash_ref = $tables[$i]->[0]; $handler_ref = $tables[$i]->[1]; foreach $f ( keys %{$hash_ref} ) { if ( -r $f ) { my(@st)=stat($f); if ( $st[9] != ${$hash_ref}{$f} ) { ${$hash_ref}{$f}=$st[9]; #print "File $f changed. Reloading.\n"; &{$handler_ref}($f); } } } } } # sub InitTables { %::hosts = (); %::protocols = (); %::udp_services = (); %::tcp_services = (); %::asns = (); @::tables = (); my(%hosts_mtime) = (); for ( $i=0; $i<=$#hosts_files; $i++) { $hosts_mtime{$hosts_files[$i]}=0; } my(@hst) = (\%hosts_mtime, \&LoadHosts); my(%protocols_mtime) = (); for ( $i=0; $i<=$#protocols_files; $i++) { $protocols_mtime{$protocols_files[$i]}=0; } my(@proto) = (\%protocols_mtime, \&LoadProtocols); my(%services_mtime) = (); for ( $i=0; $i<=$#services_files; $i++) { $services_mtime{$services_files[$i]}=0; } my(@srv)= (\%services_mtime, \&LoadServices); my(%asns_mtime) = (); for ( $i=0; $i<=$#asns_files; $i++) { $asns_mtime{$asns_files[$i]}=0; } my(@asn) = (\%asns_mtime, \&LoadAsns); @tables = ( \@hst, \@proto, \@srv, \@asn ); &UpdateTables; } # #------------------------------------------------------- # sub filename2tstamp { # usage: $tstamp = &filename2tstamp($fname); my($fname) = shift; # expecting filename - ft-v05.YYYY-mm-DD.HHMMSS {+-TZSH } # expecting, file created on local mashine, TZ ignored here $_ = basename($fname,""); if ( /ft-v05\.([0-9]{4})-([0-9]{2})-([0-9]{2})\.([0-9]{2})([0-9]{2})([0-9]{2})/ ) { return timelocal($6,$5,$4,$3,$2-1,$1-1900); } return 0; } # sub tstamp2filename { # usage: $filename = &tstamp2filename($tstamp); my($tstamp) = shift; my(@gt) = gmtime($tstamp); my($tz_shift)= $tstamp -timelocal($gt[0],$gt[1],$gt[2],$gt[3],$gt[4],$gt[5]); my($tz_sign); if ( $tz_shift > 0 ) { $tz_sign="+"; } else { $tz_sign="-"; $tz_shift = -$tz_shift; } my(@t) = localtime($tstamp); $rez = sprintf ("%s/%.4d/%.4d-%.2d/%.4d-%.2d-%.2d/ft-v05.%.4d-%.2d-%.2d.%.2d%.2d%.2d%s%.2d%.2d", $flowsdir, $t[5]+1900, $t[5]+1900,$t[4]+1, $t[5]+1900,$t[4]+1,$t[3], $t[5]+1900,$t[4]+1,$t[3],$t[2],$t[1],$t[0], $tz_sign,$tz_shift/3600, $tz_shift/60 % 60 ); return $rez; } # sub perfile { my($fname) = shift; $main::timestamp = &filename2tstamp($fname); } # # Процедура исполняется над каждым потоком в текущем файле данных. # Потоки раскладываются по временным интервалам. Если время начала потока # и время его окончания укладываются в интервал - поток копируется туда полностью в неизменном виде. # Если же время окончания потока попадает в другой интервал - этот поток разбивается на # два ( или более ) потока, так, что каждый подпоток завершается внутри одного интервала, # а число байтов и пакетов разбивается пропорционально длительности каждого подпотока. # # данные: # ассоц. масив repacked{timestamp} -> @rFlows - ссылки на массив упакованных записей о потоках # ассоц. масив was_changed{timestamp} - флаг изменения массива потоков для данного интервала. # sub Repack_Flows { $t_st = $Cflow::startime; $t_en = $Cflow::endtime; # minimal sanity check $t_now = time(); if ( $t_st > $t_now || $t_en > $t_now ) { ferr("Error: flow timestamp more then nowtime [%s]",&tstamp2filename($main::timestamp)) ; return; }; if ( $t_st > $t_en ) { ferr("Error: flow start timestamp more then endtime [%s]",&tstamp2filename($main::timestamp)) ; return; }; # print "Исходный поток: $Cflow::startime $Cflow::endtime $Cflow::srcip:$Cflow::srcport $Cflow::dstip:$Cflow::dstport $Cflow::bytes $Cflow::pkts\n"; $start_interval = int($Cflow::startime/$main::step + 1) * $main::step; $end_interval = int($Cflow::endtime/$main::step + 1 ) * $main::step; # print " Интервалы: $start_interval $end_interval\n"; if ( $start_interval == $end_interval ) { # весь поток в 1м интервале if ( defined $repacked{$start_interval} ) { $idx = @{$repacked{$start_interval}}; } else { my(@rflow) = (); $repacked{$start_interval} = \@rflow; $idx=0; } # print " Записали в интервал $start_interval\n"; ${$repacked{$start_interval}}[$idx] = $Cflow::raw; $was_changed{$start_interval}=1; } else { $t_pr = $Cflow::endtime - $Cflow::startime; # длительность исходного потока $lbps = $Cflow::bytes/($t_pr); # скорость - байтов в секунду $lpps = $Cflow::pkts/($t_pr); # пакетов в секунду $n_itr = int($t_pr/$main::step) ; # на сколько интервалов растянулся поток $start_share = $start_interval - $t_st ; # часть потока в 1м интевале $end_share = $main::step - ($end_interval - $t_en) ; # часть потока в последнем интевале # print " Скорости: bps=$lbps pps=$lpps\n"; $Cflow::endtime = $start_interval; $Cflow::bytes = round($start_share*$lbps); $Cflow::pkts = round($start_share*$lpps); if ( defined $repacked{$start_interval} ) { $idx = @{$repacked{$start_interval}} ; } else { my(@rflow) = (); $repacked{$start_interval} = \@rflow; $idx=0; } # print " Стартовый поток: [ $start_interval ] $Cflow::startime $Cflow::endtime $Cflow::srcip:$Cflow::srcport $Cflow::dstip:$Cflow::dstport $Cflow::bytes $Cflow::pkts\n"; ${$repacked{$start_interval}}[$idx] = $Cflow::reraw; # создали поток в 1м интервале $was_changed{$start_interval}=1; $Cflow::startime = $end_interval-$main::step; $Cflow::endtime = $t_en; $Cflow::bytes = round($end_share*$lbps); $Cflow::pkts = round($end_share*$lpps); if ( defined $repacked{$end_interval} ) { $idx = @{$repacked{$end_interval}}; } else { my(@rflow) = (); $repacked{$end_interval} = \@rflow; $idx=0; } # print " Конечный поток: [ $end_interval ] $Cflow::startime $Cflow::endtime $Cflow::srcip:$Cflow::srcport $Cflow::dstip:$Cflow::dstport $Cflow::bytes $Cflow::pkts\n"; ${$repacked{$end_interval}}[$idx] = $Cflow::reraw; # создали поток в последнем интервале $was_changed{$end_interval}=1; $n_itr = int(($t_pr - ($start_share+$stop_share) )/$main::step) ; # число промежуточных интервалов # print " Число промежуточных интервалов: $n_itr\n"; $curr_interval = $start_interval + $main::step; $Cflow::bytes = round($main::step*$lbps); $Cflow::pkts = round($main::step*$lpps); for ( $i=0; $i<$n_itr; $i++ ) { $Cflow::startime = $curr_interval ; $Cflow::endtime = $curr_interval + $main::step ; if ( defined $repacked{$curr_interval} ) { $idx = @{$repacked{$curr_interval}}; } else { my(@rflow) = (); $repacked{$curr_interval} = \@rflow; $idx=0; } # print " Промежуточный поток: [ $curr_interval ] $Cflow::startime $Cflow::endtime $Cflow::srcip:$Cflow::srcport $Cflow::dstip:$Cflow::dstport $Cflow::bytes $Cflow::pkts\n"; $repacked{$curr_interval}->[$idx] = $Cflow::reraw; # создали поток в промежуточном интервале $was_changed{$curr_interval}=1; $curr_interval = $curr_interval + $main::step; } } } # # Процедура выполняет полную переклассификацию над всеми переупакованными потоками # sub Classify_Repacked { foreach $t ( keys %was_changed ) { # итерация по обновленным интервалам foreach $v ( keys %view ) { $cnt_ref = $view{$v}->[6]; # список отсчетов для view my(%cnt); ${$cnt_ref}{$t}=\%cnt; } for ( $i=0; $i<@{$repacked{$t}}; $i++ ) { # итерация по всем потокам в интервале ($Cflow::index, $Cflow::exporter, $Cflow::srcaddr, $Cflow::dstaddr, $Cflow::input_if, $Cflow::output_if, $Cflow::srcport, $Cflow::dstport, $Cflow::pkts, $Cflow::bytes, $Cflow::nexthop, $Cflow::startime, $Cflow::endtime, $Cflow::protocol, $Cflow::tos, $Cflow::src_as, $Cflow::dst_as, $Cflow::src_mask, $Cflow::dst_mask, $Cflow::tcp_flags, $Cflow::engine_type, $Cflow::engine_id) = unpack($Cflow::entry,$repacked{$t}->[$i] ); foreach $v ( keys %view ) { $sub_ref = $view{$v}->[1]; # процедура-фильтр для view $autoclass_ref= $view{$v}->[2]; # процедура динамической классификации $cnt_ref = $view{$v}->[6]; # список отсчетов для view if ( &{$sub_ref} ) { my($class) = &{$autoclass_ref}; $cl_cnt_ref = ${$cnt_ref}{$t}; if ( exists ${$cl_cnt_ref}{$class} ) { ${$cl_cnt_ref}{$class} = ${$cl_cnt_ref}{$class} + $Cflow::bytes; } else { ${$cl_cnt_ref}{$class} = $Cflow::bytes; } } } } } } # # процедура печати текущего потока по умолчанию # sub PrintFlow { return "$Cflow::protocol $Cflow::srcip:$Cflow::srcport $Cflow::dstip:$Cflow::dstport\n"; } # # процедура печатает потоки попавшие в класс в данном интервале # sub PrintClassFlows { my($sub_ref) = $view{$main::selected_view}->[1]; # процедура-фильтр для view my($autoclass_ref) = $view{$main::selected_view}->[2]; # процедура динамической классификации my($cnt_ref) = $view{$main::selected_view}->[6]; # ассоц. массив отсчетов. my($printFlow_ref) = \&PrintFlow; if ( ref $view{$main::selected_view}->[4] eq 'CODE' ) { $printFlow_ref = $view{$main::selected_view}->[4]; } if ( &{$sub_ref} ) { my($class) = &{$autoclass_ref}; $cl_cnt_ref = ${$cnt_ref}{$main::selected_interval}; if ( $class eq $main::selected_class ) { $_ = &{$printFlow_ref}; $main::selected_class_buf=$main::selected_class_buf.$_; } elsif ( ($main::selected_class eq "other") && (! exists ${$cl_cnt_ref}{$class} ) ) { $_ = &{$printFlow_ref}; $main::selected_class_buf=$main::selected_class_buf.$_; } } } # sub CompactCounters { my($cnt_ref,$n_top) = @_; my(%new_cnt)=("other",0); my($k); foreach $k ( sort { ${$cnt_ref}{$b} <=> ${$cnt_ref}{$a} || $a cmp $b } keys %{$cnt_ref} ) { if ($n_top > 0 ) { $new_cnt{$k} = ${$cnt_ref}{$k}; $n_top--; } else { $new_cnt{"other"} = $new_cnt{"other"} + ${$cnt_ref}{$k}; } } return \%new_cnt; } # # Процедура исполняется вслед за процедурой классификации, # и производит сокращение классов до нужного количества, так что маленькие # классы суммируются в классе "other" # sub post_Process { foreach $t ( keys %was_changed ) { # итерация по изменённым интервалам foreach $v ( keys %view ) { my($n_top) = $view{$v}->[3]; # количество top классов для view my($cnt_ref) = $view{$v}->[6]; # список отсчетов для view $cl_cnt_ref = ${$cnt_ref}{$t}; # массив отсчетов ${$cnt_ref}{$t} = &CompactCounters($cl_cnt_ref,$n_top); undef %{$cl_cnt_ref}; } delete $was_changed{$t}; } } # # # Процедура проверяет, появился ли новый файл с данными, и если да, разбирает его. # Алгоритм основывается на том факте, что программа flow-capture создает # временный файл в названии которго используется время создания, который затем # становится новым файлом с данными. # sub reScanFile { my(@t)=localtime($main::prev_mtime); my($subpath) = sprintf("%.4d/%.4d-%.2d/%.4d-%.2d-%.2d",$t[5]+1900, $t[5]+1900,$t[4]+1,$t[5]+1900,$t[4]+1,$t[3] ); my(@st)=stat("$flowsdir/$subpath"); $f = "$flowsdir/$subpath/ft-$main::prev_file"; if ( $st[9] !=$prev_mtime ) { $prev_mtime=$st[9]; if ( -r $f) { # flog("ReScanFile: Processing $f"); my($tstamp) = &filename2tstamp("$f"); if ( ! exists $datafile{$tstamp} ) { $rez = Cflow::find(\&Repack_Flows, \&perfile, "$f"); &Classify_Repacked; &post_Process; $datafile{$tstamp} = 1; } } my($tmp_file_mask) = "$flowsdir/$subpath/tmp-v05.*"; my($wcnt)=60; my(@tfiles) = sort(glob $tmp_file_mask); while( $#tfiles<0 && $wcnt ) { sleep 1; $wcnt--; @tfiles = sort(glob $tmp_file_mask); } foreach $f ( @tfiles ) { $_=$f; if ( /.+\/tmp-([^ \/]+)$/ ) { $main::prev_file = $1; } } } } # # процедура удаляет старые отсчеты, отстоящие более чем time_window # от настоящего времени. # sub expireCounters { $main::expire_tstamp = $main::timestamp - $main::time_window; foreach $v ( keys %view ) { $v_ref = $view{$v}; $cnt_ref = ${$v_ref}[6]; # список отсчетов для view foreach $t ( sort(keys %{$cnt_ref}) ) { if ( $t < $main::expire_tstamp ) { # flog("Expired data at $t on $v"); undef %{$cnt_ref->{$t}}; delete ${$cnt_ref}{$t}; } else { break; }; } } foreach $t ( sort(keys %repacked )) { if ( $t < $main::expire_tstamp ) { undef @{$repacked{$t}}; delete $repacked{$t}; } } foreach $t ( sort(keys %datafiles)) { if ($t < $main::expire_tstamp ) { delete $datafile{$t}; } } } # # процедура ищет netflow файлы, созданные в течении некоторого интервала времени # (определяемого параметрами при вызове) и обрабатывает эти файлы. # sub loadHistory { my($curtstamp,$stop_tstamp) = @_; my(@t); my($mask); my($f); my($last_file); while ( $curtstamp < $stop_tstamp ) { @t=localtime($curtstamp); $fmask = sprintf("$flowsdir/%.4d/%.4d-%.2d/%.4d-%.2d-%.2d/ft-v05.%.4d-%.2d-%.2d.%.2d%.2d*", $t[5]+1900, $t[5]+1900,$t[4]+1, $t[5]+1900,$t[4]+1,$t[3], $t[5]+1900,$t[4]+1,$t[3],$t[2],$t[1]); foreach $f ( glob $fmask ) { my($tstamp) = &filename2tstamp($f); if ( ! exists $datafile{$tstamp} ) { # print("loadHistry: Processing $f\n"); $rez = Cflow::find(\&Repack_Flows,\&perfile, "$f"); $datafile{$tstamp} = 1; } } $curtstamp=$curtstamp+30; } &Classify_Repacked; &post_Process; } # # Основная процедура автомата обработки клиенских запросов. # По приходу новых данных от клиента проверяется признак конца команды (пустая строка). # Обнаруженная команда разбирается и исполняется. Результат выполнения команды ставится # в очередь на отправку клиенту. # # Команда - последовательность непустых строк, за которой следует пустая строка. # Результат - последовательность непустых строк, за которой следует пустая строка. # # sub scan_cmd { my($fh)=shift; $fh_rdbuf{$fh}=~s/^\r\n\r\n//; $_=$fh_rdbuf{$fh}; if ( /(.+)\r\n\r\n/ ) { # flog("Got cmd: '$1'"); $_=$1; $fh_rdbuf{$fh}=~s/.+\r\n\r\n//; SWITCH: { /^GET PARAMS$/ && do { # GET PARAMS $fh_wrbuf{$fh}=$main::version." ".$main::step." ".$main::time_window."\r\n\r\n"; last SWITCH; }; /^GET VIEWS$/ && do { # GET VIEWS my($v)=""; foreach $k ( keys %view ) { $v=$v."{$k}\r\n"; }; $fh_wrbuf{$fh}="$v\r\n"; last SWITCH; }; /^DESCRIBE VIEW \"([^\"]+)\"$/ && do { if ( exists $view{$1} ) { $fh_wrbuf{$fh}="$view{$1}->[0]\r\n\r\n"; } else { $fh_wrbuf{$fh}="\r\n"; } last SWITCH; }; /^GET FLOWS \"([^\"]+)\" \"([^\"]+)\" ([0-9]+)$/ && do { # GET FLOWS "view" "class" timestamp $main::selected_view=$1; $main::selected_class=$2; $main::selected_interval = $3; $main::selected_class_buf=""; if ( exists $view{$main::selected_view} ) { if ( exists $repacked{$main::selected_interval} ) { # for ( $i=0; $i<$#{$repacked{$main::selected_interval}}; $i++ ) { # итерация по всем потокам в интервале for ( $i=0; $i<@{$repacked{$main::selected_interval}}; $i++ ) { # итерация по всем потокам в интервале ($Cflow::index, $Cflow::exporter, $Cflow::srcaddr, $Cflow::dstaddr, $Cflow::input_if, $Cflow::output_if, $Cflow::srcport, $Cflow::dstport, $Cflow::pkts, $Cflow::bytes, $Cflow::nexthop, $Cflow::startime, $Cflow::endtime, $Cflow::protocol, $Cflow::tos, $Cflow::src_as, $Cflow::dst_as, $Cflow::src_mask, $Cflow::dst_mask, $Cflow::tcp_flags, $Cflow::engine_type, $Cflow::engine_id) = unpack($Cflow::entry,$repacked{$main::selected_interval}->[$i] ); &PrintClassFlows; } } } $fh_wrbuf{$fh}=$main::selected_class_buf."\r\n"; last SWITCH; }; /^GET FLOWS LEGEND \"([^\"]+)\"$/ && do { # GET FLOWS LEGEND "view" $main::selected_view=$1; if ( exists $view{$1} ) { if ( defined $view{$1}->[5] ) { $fh_wrbuf{$fh} = $view{$1}->[5]; } else { $fh_wrbuf{$fh}="1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24"; } } $fh_wrbuf{$fh}=$fh_wrbuf{$fh}."\r\n\r\n"; last SWITCH; }; /^UPDATE \"([^\"]+)\" ([0-9]+)$/ && do { # UPDATE "view" timestamp my($v)=""; if ( exists $view{$1} ) { # flog "$fh $1 $2\n"; $cnt_ref=$view{$1}->[6]; @c_ar = sort(keys %{$cnt_ref}); $last = @c_ar - 1; undef $c_ar[$last]; foreach $k ( @c_ar ) { # flog " $k"; if ( $k > $2 ) { $v=$v."{$k} {"; $data_ref=$cnt_ref->{$k}; foreach $c ( sort { ${$data_ref}{$b} <=> ${$data_ref}{$a} || $a cmp $b } keys %{$data_ref} ) { $v=$v."{$c} {$data_ref->{$c}} "; } $v=$v."}\r\n"; } }; } $fh_wrbuf{$fh}="$v\r\n"; last SWITCH; }; /^HISTORY \"([^\"]+)\" ([0-9]+) ([0-9]+)$/ && do { # HISTORY "view" timestamp timewindow my($v)="";my($tstart)=$2; my($tstop)=$2+$3; if ( $main::history_mode==0 ) { $v="NOT IN HISTORY MODE"; } else { &loadHistory($tstart,$tstop); if ( exists $view{$1} ) { $cnt_ref=$view{$1}->[6]; foreach $k ( sort(keys %{$cnt_ref}) ) { if ( ($k >= $tstart ) && ($k < $tstop ) ) { $v=$v."{$k} {"; $data_ref=$cnt_ref->{$k}; foreach $c ( sort { ${$data_ref}{$b} <=> ${$data_ref}{$a} || $a cmp $b } keys %{$data_ref} ) { $v=$v."{$c} {$data_ref->{$c}} "; } $v=$v."}\r\n"; } } } } $fh_wrbuf{$fh}="$v\r\n"; last SWITCH; }; /^HISTORY MODE$/ && do { if ( fork() == 0 ) { $main::history_mode=1; setpgrp(0,0); # закрыть все сокеты кроме текущего. $rd_sel->remove($sock); $er_sel->remove($sock); $sock->close; my($local_fh); foreach $local_fh ( $er_sel->handles ) { if ($local_fh != $fh ){ $rd_sel->remove($local_fh); $wr_sel->remove($local_fh); $er_sel->remove($local_fh); undef $fh_wrbuf{$local_fh}; undef $fh_rdbuf{$local_fh}; $local_fh->close; } } $fh_wrbuf{$fh}="HISTORY MODE CHILD [$$]\r\n\r\n"; last SWITCH; } # В серверном процессе - закрыть сокет. Его будет обрабатывать потомок. $rd_sel->remove($fh); $er_sel->remove($fh); undef $fh_wrbuf{$fh}; undef $fh_rdbuf{$fh}; $fh->close; return; }; /^QUIT$/ && do { $rd_sel->remove($fh); $er_sel->remove($fh); undef $fh_wrbuf{$fh}; undef $fh_rdbuf{$fh}; $fh->close; return; }; $fh_wrbuf{$fh}="Unknown command:'$_'\r\n\r\n"; }; $rd_sel->remove($fh); $wr_sel->add($fh); }; }; #------------------------------------------------------------------------------------ # # Главный цикл # #------------------------------------------------------------------------------------ # $SIG{PIPE} = sub {}; $SIG{HUP} = sub { $main::reconfig_requested=1; }; # $sock = IO::Socket::INET->new( Listen => 5, LocalAddr => $my_addr, LocalPort => $my_port, Proto => 'tcp', Reuse => 'SO_REUSEADDR' ); if ( !defined $sock ) { print "Can't bind socket: $my_addr:$my_port\n"; exit(1); } $rd_sel->add($sock); $er_sel->add($sock); if ( $main::daemon ) { die "Can't open logfile!\n" unless openlog("$main::my_name", "ndelay,pid","daemon"); flog("Starting $main::my_name [$$]."); $SIG{TERM} = sub { flog("Exiting $main::my_name [$$] by signal.\n"); unlink $main::pidfile; closelog(); exit(0); }; # if ( fork() > 0 ) { exit 0; }; setpgrp(0,0); # if (open(PID, ">$main::pidfile" )) { print PID "$$\n"; close PID; } else { ferr("Can't open pidfile '$main::pidfile'!\n"); } } else { # used for debuging flog("Starting $main::my_name [$$]."); $SIG{TERM} = sub { flog "Got SIGTERM. Exiting"; exit 0; }; } $select_tmo = 0; while( true ) { ($rd_ready, $wr_ready, $got_err) = IO::Select::select( $rd_sel, $wr_sel, $er_sel, $select_tmo ); foreach $fh ( @{$rd_ready} ) { if ( $fh == $sock ) { $newsock = $sock->accept; $rd_sel->add($newsock); $er_sel->add($newsock); $fh_rdbuf{$fh}=$fh_wrbuf{$fh}=""; } else { $len = $fh->sysread($rez, 1024 ); if ( $len >0 ) { $fh_rdbuf{$fh}=$fh_rdbuf{$fh}.$rez; &scan_cmd($fh); } else { # socket closed $rd_sel->remove($fh); $er_sel->remove($fh); undef($fh_wrbuf{$fh}); undef($fh_rdbuf{$fh}); close $fh; } } } foreach $fh ( @{$wr_ready} ) { if ( $fh == $sock ) { } else { $len = $fh->syswrite( $fh_wrbuf{$fh},length($fh_wrbuf{$fh})); if ( $len != length($fh_wrbuf{$fh}) ) { $fh_wrbuf{$fh} = substr($fh_wrbuf{$fh},$len,length($fh_wrbuf{$fh})-$len); } else { $fh_wrbuf{$fh}=""; $wr_sel->remove($fh); $rd_sel->add($fh); } } } foreach $fh ( @{$got_err} ) { if ( $fh == $sock ) { ferr("Fatal error on listening socket!"); exit(1); } else { ferr("Got error on $fh"); $wr_sel->remove($fh); $rd_sel->remove($fh); $er_sel->remove($fh); undef($fh_wrbuf{$fh}); undef($fh_rdbuf{$fh}); close $fh; } } if ( $main::reconfig_requested ) { unless ( $rez = do $main::view_definitions ) { ferr("Couldn't parse $main::view_definitions: $@") if $@; ferr("Couldn't do $main::view_definitions: $!") unless defined $rez; ferr("Couldn't run $main::view_definitions: $@") unless $rez; } foreach $k ( keys %view ) { my(%empty_hash); $view{$k}->[6]=\%empty_hash; } flog("Reconfig Ok."); &InitTables; $main::history_start = time()-$main::time_window; $main::history_stop = time(); ### &loadHistory(time()-$main::time_window,time()); $select_tmo=0; $main::reconfig_requested=0; } ## flog("Select_tmo $select_tmo"); if ($main::history_start < $main::history_stop ) { &loadHistory($main::history_start, $main::history_start+300); ## flog("Checking old data from $main::history_start (300)."); $main::history_start = $main::history_start+300; if ($main::history_start >= $main::history_stop ) { $select_tmo = $file_check_interval; } } if ( $main::history_mode==1 ) { if ( $er_sel->handles == 0 ) { exit(0); } } else { my($kid); $kid=waitpid(-1,&WNOHANG); if ( $last_file_check + $file_check_interval <= time() ) { $last_file_check = time(); &reScanFile; &expireCounters; &UpdateTables; } } }; #