diff options
author | Andreas Unterkircher <unki@netshadow.at> | 2008-12-03 20:37:13 +0100 |
---|---|---|
committer | Andreas Unterkircher <unki@netshadow.at> | 2008-12-12 18:36:55 +0100 |
commit | 0a6e4fae2c79d5f9da1033e0a51abfc69e10b8b2 (patch) | |
tree | 041b13746bede1eeceec181a8a00405e26d9db36 /lib/exilog_sql.pm | |
parent | 226ad0a3c764c0606048acf7371b02765eee60d2 (diff) | |
download | exilog-0a6e4fae2c79d5f9da1033e0a51abfc69e10b8b2.zip exilog-0a6e4fae2c79d5f9da1033e0a51abfc69e10b8b2.tar.gz exilog-0a6e4fae2c79d5f9da1033e0a51abfc69e10b8b2.tar.bz2 |
sort files into their directories. move agent- and cleanup-script into 'agents', all static www-content (icons, stylesheet, javascript, ...) into 'htdocs'. cgi-stuff into 'cgi' and all reused code into 'lib'.
Signed-off-by: Andreas Unterkircher <unki@netshadow.at>
Diffstat (limited to 'lib/exilog_sql.pm')
-rw-r--r-- | lib/exilog_sql.pm | 556 |
1 files changed, 556 insertions, 0 deletions
diff --git a/lib/exilog_sql.pm b/lib/exilog_sql.pm new file mode 100644 index 0000000..fc8bc71 --- /dev/null +++ b/lib/exilog_sql.pm @@ -0,0 +1,556 @@ +#!/usr/bin/perl +# +# This file is part of the exilog suite. +# +# http://duncanthrax.net/exilog/ +# +# (c) Tom Kistner 2004 +# +# See LICENSE for licensing information. +# + +package exilog_sql; +use strict; +use DBI; +use exilog_config; +use exilog_util; + +use Data::Dumper; + +BEGIN { + use Exporter; + use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS); + + # set the version for version checking + $VERSION = 0.1; + @ISA = qw(Exporter); + @EXPORT = qw( + &reconnect + &sql_select + &sql_delete + &sql_optimize + &sql_count + &sql_queue_add + &sql_queue_update + &sql_queue_delete + &sql_queue_set_action + &sql_queue_clear_action + &write_message + ); + + %EXPORT_TAGS = (); + + # your exported package globals go here, + # as well as any optionally exported functions + @EXPORT_OK = qw(); +} + + +# open DB connection +my $dbh = DBI->connect($config->{sql}->{DBI}, $config->{sql}->{user}, $config->{sql}->{pass}); +unless (defined($dbh) && $dbh) { + print STDERR "[exilog_sql] Can't open exilog database.\n"; + exit(255); +}; + +sub reconnect { + my $conditional = shift || 0; + if ($conditional) { + return 1 if ($dbh->ping); + }; + eval { + $dbh->disconnect() if (defined($dbh)); + }; + $dbh = 0; + $dbh = DBI->connect($config->{sql}->{DBI}, $config->{sql}->{user}, $config->{sql}->{pass}); + unless (defined($dbh) && $dbh) { + print STDERR "[exilog_sql] Can't open exilog database.\n"; + return 0; + }; + return 1; +}; + + +# -------------------------------------------------------------------------- +# Generic Stubs, these are just frontends that call the backend-specific +# SQL subroutines for each database type. +sub write_message { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_write_message" }(@_); +}; + +sub sql_select { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_select" }(@_); +}; + +sub sql_delete { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_delete" }(@_); +}; + +sub sql_optimize { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_optimize" }(@_); +}; + +sub sql_queue_add { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_queue_add" }(@_); +}; + +sub sql_queue_update { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_queue_update" }(@_); +}; + +sub sql_queue_delete { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_queue_delete" }(@_); +}; + +sub sql_queue_set_action { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_queue_set_action" }(@_); +}; + +sub sql_queue_clear_action { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_queue_clear_action" }(@_); +}; + +sub sql_count { + no strict "refs"; + return &{ "_".$config->{sql}->{type}."_sql_count" }(@_); +}; +# -------------------------------------------------------------------------- + + +# -------------------------------------------------------------------------- +# PostgreSQL functions +sub _pgsql_sql_count { + my $where = shift; + my $criteria = shift || {}; + + my $sql = "SELECT ". + "COUNT(*) ". + "FROM ".$where. + ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); + + my $sh = $dbh->prepare($sql); + $sh->execute; + my $tmp = $sh->fetchrow_arrayref(); + return @{$tmp}[0]; +}; + +sub _pgsql_sql_queue_delete { + my $spool_path = shift; + + $dbh->do("DELETE FROM queue WHERE spool_path='$spool_path'"); +}; + +sub _pgsql_sql_queue_update { + my $hdr = shift; + + return unless (ref($hdr) eq 'HASH'); + + my $server = $hdr->{server}; + my $message_id = $hdr->{message_id}; + delete $hdr->{server}; + delete $hdr->{message_id}; + + # PostgreSQL is case sensitive by default. Nice feature, + # but it complicates our life tremendously. + # Since we want to keep indexes working, the columns in + # this list are lowercased before they are inserted. Sigh. + my @lowercase = ( 'mailfrom', 'recipients_delivered', 'recipients_pending' ); + foreach my $col (@lowercase) { + $hdr->{$col} = lc($hdr->{$col}) if (edt($hdr,$col)); + }; + + my @tmp; + foreach my $item (keys %{ $hdr }) { + my $value = $hdr->{$item}; + $value =~ s/\'/\'\'/g; + $value =~ s/\n/\\n/g; + push @tmp, $item.'='."'".$value."'"; + }; + + $dbh->do("UPDATE queue SET ".join(",",@tmp)." WHERE message_id='".$message_id."' AND server='".$server."'"); +}; + +sub _pgsql_sql_queue_add { + my $hdr = shift; + + return unless (ref($hdr) eq 'HASH'); + + # PostgreSQL is case sensitive by default. Nice feature, + # but it complicates our life tremendously. + # Since we want to keep indexes working, the columns in + # this list are lowercased before they are inserted. Sigh. + my @lowercase = ( 'mailfrom', 'recipients_delivered', 'recipients_pending' ); + foreach my $col (@lowercase) { + $hdr->{$col} = lc($hdr->{$col}) if (edt($hdr,$col)); + }; + + my @fields = sort {$a cmp $b} keys(%{$hdr}); + my @vals = (); + foreach (@fields) { + my $val = $hdr->{$_}; + $val =~ s/\'/\'\'/g; + $val =~ s/\n/\\n/g; + push @vals, "'".$val."'"; + }; + + $dbh->do("INSERT INTO queue (".join(',',@fields).") VALUES(".join(',',@vals).")"); +}; + +sub _pgsql_sql_optimize { + my $where = shift || "nothing"; + + my $sql = "OPTIMIZE TABLE ".$where; + my $sh = $dbh->prepare($sql); + $sh->execute; + $sh->finish; + + return 1; +}; + +sub _pgsql_sql_delete { + my $where = shift || "nothing"; + my $criteria = shift || {}; + + my $sql = "DELETE FROM ".$where. + ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); + + my $sh = $dbh->prepare($sql); + my $num = $sh->execute; + $sh->finish; + + return (($num eq '0E0') ? 0 : $num); +}; + +sub _pgsql_sql_select { + my $where = shift; + my @what = @{ (shift || [ "*" ]) }; + my $criteria = shift || {}; + my $order_by = shift || ""; + my $order_direction = shift || "DESC"; + my $limit_min = shift; + my $limit_max = shift; + my $distinct = shift; + + my $sql = "SELECT ". + (defined($distinct) ? "DISTINCT " : ""). + join(", ", @what). + " FROM ".$where. + ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ). + ($order_by ? " ORDER BY ".$order_by." ".$order_direction : ""). + (defined($limit_min) ? " LIMIT ".$limit_min : ""). + (defined($limit_max) ? ",".$limit_max : ""); + + return _fetch_multirow($where, $sql); +}; + +sub _pgsql_write_message { + my $server = shift || 'default'; + my $h = shift; + my $rc = 0; + + # PostgreSQL is case sensitive by default. Nice feature, + # but it complicates our life tremendously. + # Since we want to keep indexes working, the columns in + # this list are lowercased before they are inserted. Sigh. + my @lowercase = ( 'mailfrom', 'rcpt', 'rcpt_final', 'host_dns', 'host_helo', 'host_rdns' ); + foreach my $col (@lowercase) { + $h->{data}->{$col} = lc($h->{data}->{$col}) if (edt($h->{data},$col)); + }; + + # Special case: we only need to UPDATE the 'completed' field + # in the messages table. + if ( ($h->{table} eq 'messages') && (exists($h->{data}->{completed})) ) { + my $rc = $dbh->do("UPDATE messages SET completed='".$h->{data}->{completed}."' WHERE message_id='".$h->{data}->{message_id}."' AND server='".$server."'"); + if (defined($rc)) { + return 1; + } + else { + # error + return 0; + }; + } + else { + my @fields = sort {$a cmp $b} keys(%{$h->{data}}); + my @vals = ( "'".$server."'" ); + foreach (@fields) { + my $val = $h->{data}->{$_}; + $val =~ s/\'/\'\'/g; + # shorten $val to limit and remove eventual + # trailing quote and backslash characters. + $val = substr($val,0,255); + $val =~ s/[\\']+$//; + push @vals, "'".$val."'"; + }; + unshift @fields, 'server'; + + my $sql = "INSERT INTO ".$h->{table}.' ("'.join('","',@fields).'") VALUES('.join(',',@vals).")"; + my $rc = $dbh->do($sql); + + if (defined($rc)) { + return 1; + } + else { + return 2 if ($dbh->errstr =~ /duplicate/i); + print STDERR "SQL Error (code ".$dbh->err.") on '$h->{table}' with query: $sql\n"; + return 0; + }; + }; +}; + + +# -------------------------------------------------------------------------- +# MySQL functions +sub _mysql_sql_count { + my $where = shift; + my $criteria = shift || {}; + + my $sql = "SELECT ". + "COUNT(*) ". + "FROM ".$where. + ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); + + my $sh = $dbh->prepare($sql); + $sh->execute; + my $tmp = $sh->fetchrow_arrayref(); + return @{$tmp}[0]; +}; + +sub _mysql_sql_queue_delete { + my $spool_path = shift; + + $dbh->do("DELETE FROM queue WHERE spool_path='$spool_path'"); +}; + +sub _mysql_sql_queue_update { + my $hdr = shift; + + return unless (ref($hdr) eq 'HASH'); + + my $server = $hdr->{server}; + my $message_id = $hdr->{message_id}; + delete $hdr->{server}; + delete $hdr->{message_id}; + + my @tmp; + foreach my $item (keys %{ $hdr }) { + my $value = $hdr->{$item}; + $value =~ s/\'/\'\'/g; + $value =~ s/\n/\\n/g; + push @tmp, $item.'='."'".$value."'"; + }; + + $dbh->do("UPDATE queue SET ".join(",",@tmp)." WHERE message_id='".$message_id."' AND server='".$server."'"); +}; + +sub _mysql_sql_queue_add { + my $hdr = shift; + + return unless (ref($hdr) eq 'HASH'); + + my @fields = sort {$a cmp $b} keys(%{$hdr}); + my @vals = (); + foreach (@fields) { + my $val = $hdr->{$_}; + $val =~ s/\'/\'\'/g; + $val =~ s/\n/\\n/g; + push @vals, "'".$val."'"; + }; + + $dbh->do("INSERT INTO queue (".join(',',@fields).") VALUES(".join(',',@vals).")"); +}; + +sub _mysql_sql_queue_set_action { + my $server = shift; + my $message_id = shift; + my $action = shift; + + $dbh->do("UPDATE queue SET action='$action' WHERE server='$server' AND message_id='$message_id'"); +}; + +sub _mysql_sql_queue_clear_action { + my $server = shift; + my $message_id = shift; + + $dbh->do("UPDATE queue SET action=NULL WHERE server='$server' AND message_id='$message_id'"); +}; + + +sub _mysql_sql_optimize { + my $where = shift || "nothing"; + + my $sql = "OPTIMIZE TABLE ".$where; + my $sh = $dbh->prepare($sql); + $sh->execute; + $sh->finish; + + return 1; +}; + +sub _mysql_sql_delete { + my $where = shift || "nothing"; + my $criteria = shift || {}; + + my $sql = "DELETE FROM ".$where. + ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ); + + my $sh = $dbh->prepare($sql); + my $num = $sh->execute; + $sh->finish; + + return (($num eq '0E0') ? 0 : $num); +}; + +sub _mysql_sql_select { + my $where = shift; + my @what = @{ (shift || [ "*" ]) }; + my $criteria = shift || {}; + my $order_by = shift || ""; + my $order_direction = shift || "DESC"; + my $limit_min = shift; + my $limit_max = shift; + my $distinct = shift; + + my $sql = "SELECT ". + (defined($distinct) ? "DISTINCT " : ""). + join(", ", @what). + " FROM ".$where. + ((scalar keys %{ $criteria } ) ? " "._build_WHERE($criteria) : "" ). + ($order_by ? " ORDER BY ".$order_by." ".$order_direction : ""). + (defined($limit_min) ? " LIMIT ".$limit_min : ""). + (defined($limit_max) ? ",".$limit_max : ""); + + return _fetch_multirow($where, $sql); +}; + +sub _mysql_write_message { + my $server = shift || 'default'; + my $h = shift; + my $rc = 0; + + # Special case: we only need to UPDATE the 'completed' field + # in the messages table. + if ( ($h->{table} eq 'messages') && (exists($h->{data}->{completed})) ) { + my $rc = $dbh->do("UPDATE messages SET completed='".$h->{data}->{completed}."' WHERE message_id='".$h->{data}->{message_id}."' AND server='".$server."'"); + if (defined($rc)) { + return 1; + } + else { + # error + return 0; + }; + } + else { + my @fields = sort {$a cmp $b} keys(%{$h->{data}}); + my @vals = ( "'".$server."'" ); + foreach (@fields) { + my $val = $h->{data}->{$_}; + $val =~ s/\'/\'\'/g; + # shorten $val to limit and remove eventual + # trailing quote and backslash characters. + $val = substr($val,0,255); + $val =~ s/[\\']+$//; + push @vals, "'".$val."'"; + }; + unshift @fields, 'server'; + + my $sql = "INSERT INTO ".$h->{table}." (".join(',',@fields).") VALUES(".join(',',@vals).")"; + my $rc = $dbh->do($sql); + + if (defined($rc)) { + return 1; + } + else { + # error 1062 means "Duplicate key". + return 2 if ($dbh->err == 1062); + print STDERR "SQL Error (code ".$dbh->err.") on '$h->{table}' with query: $sql\n"; + return 0; + }; + }; +}; + + +# -------------------------------------------------------------------------- +# misc subroutines used across several DB types +sub _fetch_multirow { + my $table = shift; + my $sql = shift; + my $limit = shift || 0; + + my $a = []; + my $sh = $dbh->prepare($sql); + $sh->execute; + while (my $tmp = $sh->fetchrow_hashref) { + push @{ $a }, $tmp; + $limit--; + last if ($limit == 0); + }; + $sh->finish; + + return $a; +}; + +sub _build_WHERE { + my $criteria = shift || {}; + + my @set = (); + foreach my $col (keys %{ $criteria }) { + next unless(defined($criteria->{$col})); + + if ( ($col eq "timestamp") || + ($col eq "completed") || + ($col eq "frozen") || + ($col eq "size") ) { + # integer column + my ($min,$max) = split / /,$criteria->{$col}; + + if (defined($min)) { + # greater than X + push @set, $col." > ".$min; + } + if (defined($max)) { + # smaller than X + push @set, $col." < ".$max; + } + } + elsif (ref($criteria->{$col}) eq 'ARRAY') { + # array ref, use exact string match with OR + my $str = "( "; + foreach my $entry (@{ $criteria->{$col} }) { + $str .= " ".$col." = '".$entry."' OR"; + }; + chop($str);chop($str); + $str .= " )"; + + push @set, $str; + } + else { + # string column + if (($criteria->{$col} =~ /\%/) || ($criteria->{$col} =~ /\_/)) { + # use ILIKE for PGSQL + if ($config->{sql}->{type} eq 'pgsql') { + push @set, $col." ILIKE '".$criteria->{$col}."'"; + } + else { + push @set, $col." LIKE '".$criteria->{$col}."'"; + }; + } + else { + push @set, $col." = '".$criteria->{$col}."'"; + }; + }; + }; + + return " WHERE ".join(" AND ", @set); +}; + + +1; |