import re
import os
import sys
+import subprocess
from datetime import datetime, timedelta
from collections import defaultdict
BLOCKCOOLOFF_DELTA=timedelta(hours=1)
# quiet time before an IP is released from the bulk pool
BULKCOOLOFF_DELTA=timedelta(minutes=15)
+# time to check if new accesses appear despite being blocked
+BLOCKCHECK_DELTA=timedelta(minutes=1)
BULKLONG_LIMIT=8000
BULKSHORT_LIMIT=2000
time_regex = r'(?P<t_day>\d\d)/(?P<t_month>[A-Za-z]+)/(?P<t_year>\d\d\d\d):(?P<t_hour>\d\d):(?P<t_min>\d\d):(?P<t_sec>\d\d) [+-]\d\d\d\d'
-format_pat= re.compile(r'(?P<ip>[(\d\.)]+) - - \['+ time_regex + r'] "(?P<query>.*?)" (?P<return>\d+) (?P<bytes>\d+) "(?P<referer>.*?)" "(?P<ua>.*?)"')
-time_pat= re.compile(r'[(\d\.)]+ - - \[' + time_regex + '\] ')
+format_pat= re.compile(r'(?P<ip>[a-f\d\.:]+) - - \['+ time_regex + r'] "(?P<query>.*?)" (?P<return>\d+) (?P<bytes>\d+) "(?P<referer>.*?)" "(?P<ua>.*?)"')
+time_pat= re.compile(r'[a-f\d:\.]+ - - \[' + time_regex + '\] ')
logtime_pat = "%d/%b/%Y:%H:%M:%S %z"
if qp[0] == 'OPTIONS':
self.request = None
else:
- if qp[1].startswith('/search'):
+ if '/?' in qp[1]:
self.request = 'S'
- elif qp[1].startswith('/reverse'):
+ elif '/search' in qp[1]:
+ self.request = 'S'
+ elif '/reverse' in qp[1]:
self.request = 'R'
- elif qp[1].startswith('/details'):
+ elif '/details' in qp[1]:
self.request = 'D'
+ elif '/lookup' in qp[1]:
+ self.request = 'L'
else:
self.request = None
self.query = e['query']
return LogEntry.get_log_time(l) if l is not None else None
def seek_to_date(self, target):
- date1 = self.seek_next(0)
- if date1 > target:
+ # start position for binary search
+ fromseek = 0
+ fromdate = self.seek_next(0)
+ if fromdate > target:
+ return True
+ # end position for binary search
+ toseek = -100
+ while -toseek < self.len:
+ todate = self.seek_next(self.len + toseek)
+ if todate is not None:
+ break
+ toseek -= 100
+ if todate is None or todate < target:
return False
- curseek = 2*1024*1024
- curdate = self.seek_next(curseek)
- if curdate is None:
- raise RuntimeError("Cannot seek to %d" % curseek)
- while target < curdate or (target - curdate).total_seconds() > 1:
- bps = curseek / ((curdate - date1).total_seconds())
- curseek += (target - curdate).total_seconds() * bps
- if curseek < 0:
- curseek = 0
- elif curseek > self.len:
- curseek = self.len - bps
- curdate = self.seek_next(curseek)
- if curdate is None:
- raise RuntimeError("Cannot see to %d" % curseek)
- return True
+ toseek = self.len + toseek
+
+
+ while True:
+ bps = (toseek - fromseek) / (todate - fromdate).total_seconds()
+ newseek = fromseek + int((target - fromdate).total_seconds() * bps)
+ newdate = self.seek_next(newseek)
+ if newdate is None:
+ return False;
+ error = abs((target - newdate).total_seconds())
+ if error < 1:
+ return True
+ if newdate > target:
+ toseek = newseek
+ todate = newdate
+ oldfromseek = fromseek
+ fromseek = toseek - error * bps
+ while True:
+ if fromseek <= oldfromseek:
+ fromseek = oldfromseek
+ fromdate = self.seek_next(fromseek)
+ break
+ fromdate = self.seek_next(fromseek)
+ if fromdate < target:
+ break;
+ bps *=2
+ fromseek -= error * bps
+ else:
+ fromseek = newseek
+ fromdate = newdate
+ oldtoseek = toseek
+ toseek = fromseek + error * bps
+ while True:
+ if toseek > oldtoseek:
+ toseek = oldtoseek
+ todate = self.seek_next(toseek)
+ break
+ todate = self.seek_next(toseek)
+ if todate > target:
+ break
+ bps *=2
+ toseek += error * bps
+ if toseek - fromseek < 500:
+ return True
+
def loglines(self):
for l in self.fd:
- yield LogEntry(l)
+ try:
+ yield LogEntry(l)
+ except ValueError:
+ pass # ignore invalid lines
class BlockList:
class IPstats:
def __init__(self):
+ self.redirected = 0
self.short_total = 0
self.short_api = 0
self.long_total = 0
self.long_api = 0
+ self.block_total = 0
self.bad_ua = False
def add_long(self, logentry):
self.long_total += 1
+ if logentry.retcode == 301:
+ return
if logentry.request is not None:
self.long_api += 1
if not self.bad_ua:
def add_short(self, logentry):
self.short_total += 1
+ if logentry.retcode == 301:
+ self.redirected += 1
+ return
if logentry.request is not None:
self.short_api += 1
self.add_long(logentry)
+ def add_block(self, logentry):
+ self.block_total += 1
+
+ def ignores_warnings(self, wasblocked):
+ return self.block_total > 5 or (wasblocked and self.redirected > 5)
+
def new_state(self, was_blocked, was_bulked):
if was_blocked:
# deblock only if the IP has been really quiet
# (properly catches the ones that simply ignore the HTTP error)
- return None if self.long_total < 5 else 'block'
- if self.long_api > BLOCK_UPPER or self.short_api > BLOCK_UPPER / 3:
+ return None if self.long_total < 20 else 'block'
+ if self.long_api > BLOCK_UPPER \
+ or self.short_api > BLOCK_UPPER / 3 \
+ or (self.redirected > 100 and self.short_total == self.redirected):
# client totally overdoing it
return 'block'
if was_bulked:
- if self.short_total < 5:
+ if self.short_total < 20:
# client has stopped, debulk
return None
if self.long_api > BLOCK_LIMIT or self.short_api > BLOCK_LIMIT / 3:
return 'bulk'
if self.long_api > BULKLONG_LIMIT or self.short_api > BULKSHORT_LIMIT:
- if self.bad_ua:
- return 'uablock' # bad useragent
+ #if self.bad_ua:
+ # return 'uablock' # bad useragent
return 'bulk'
return None
bl = BlockList()
shortstart = dt + BLOCKCOOLOFF_DELTA - BULKCOOLOFF_DELTA
+ blockstart = dt + BLOCKCOOLOFF_DELTA - BLOCKCHECK_DELTA
notlogged = bl.whitelist | bl.blacklist
stats = defaultdict(IPstats)
stats[l.ip].add_short(l)
if l.request is not None and l.retcode == 200:
total200 += 1
+ if l.date > blockstart and l.retcode in (403, 429):
+ stats[l.ip].add_block(l)
# adapt limits according to CPU and DB load
fd = open("/proc/loadavg")
cpuload = int(float(fd.readline().split()[2]))
fd.close()
- dbload = total200 / BULKCOOLOFF_DELTA.total_seconds()
+ # check the number of excess connections to apache
+ dbcons = int(subprocess.check_output("netstat -s | grep 'connections established' | sed 's:^\s*::;s: .*::'", shell=True))
+ fpms = int(subprocess.check_output('ps -Af | grep php-fpm | wc -l', shell=True))
+ dbload = max(0, dbcons - fpms)
numbulks = len(bl.prevbulks)
- BLOCK_LIMIT = max(BLOCK_LIMIT, BLOCK_UPPER - BLOCK_LOADFAC * (dbload - 75))
- BULKLONG_LIMIT = max(BULK_LOWER, BULKLONG_LIMIT - BULK_LOADFAC * (cpuload - 14))
+ BLOCK_LIMIT = max(BLOCK_LIMIT, BLOCK_UPPER - BLOCK_LOADFAC * dbload)
+ BULKLONG_LIMIT = max(BULK_LOWER, BULKLONG_LIMIT - BULK_LOADFAC * cpuload)
if numbulks > MAX_BULK_IPS:
BLOCK_LIMIT = max(3600, BLOCK_LOWER - (numbulks - MAX_BULK_IPS)*10)
# if the bulk pool is still empty, clients will be faster, avoid having
# them blocked in this case
if numbulks < 10:
- BLOCK_LIMIT = 2*BLOCK_UPPER
+ BLOCK_UPPER *= 2
+ BLOCK_LIMIT = BLOCK_UPPER
# collecting statistics
elif wasbulked:
debulked.append(k)
for i in bl.blacklist:
- fd.write("%s ban\n" % k)
+ fd.write("%s ban\n" % i)
fd.close()
# TODO write logs (need to collect some statistics)
- logstr = datetime.now().strftime('%Y-%m-%d %H:%M') + ' %s %s\n'
+ logstr = datetime.now().strftime('%d/%b/%Y:%H:%M:%S') + ' %s %s\n'
fd = open(LOGFILE, 'a')
if unblocked:
fd.write(logstr % ('unblocked:', ', '.join(unblocked)))
fd.write(logstr % (' ua block:', ', '.join(uablocked)))
if blocked:
fd.write(logstr % ('new block:', ', '.join(blocked)))
+ #for k,v in stats.items():
+ # if v.ignores_warnings(k in bl.prevblocks) and k not in notlogged and ':' not in k:
+ # fd.write(logstr % ('Warning ignored:', k))
fd.close()