X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/dd85af1b610b7f55bbfb2940a4325337291372ee..ab54a4b8d7ba75f7a1ecd18de34bbce5af291b5b:/utils/cron_ipanalyse.py diff --git a/utils/cron_ipanalyse.py b/utils/cron_ipanalyse.py index 2d0738af..1ca2267f 100755 --- a/utils/cron_ipanalyse.py +++ b/utils/cron_ipanalyse.py @@ -9,6 +9,7 @@ import re import os import sys +import subprocess from datetime import datetime, timedelta from collections import defaultdict @@ -33,6 +34,8 @@ UA_BLOCKLIST = () BLOCKCOOLOFF_DELTA=timedelta(hours=1) # quiet time before an IP is released from the bulk pool BULKCOOLOFF_DELTA=timedelta(minutes=15) +# time to check if new accesses appear despite being blocked +BLOCKCHECK_DELTA=timedelta(minutes=1) BULKLONG_LIMIT=8000 BULKSHORT_LIMIT=2000 @@ -58,8 +61,8 @@ BLOCK_LIMIT = BLOCK_LOWER time_regex = r'(?P\d\d)/(?P[A-Za-z]+)/(?P\d\d\d\d):(?P\d\d):(?P\d\d):(?P\d\d) [+-]\d\d\d\d' -format_pat= re.compile(r'(?P[(\d\.)]+) - - \['+ time_regex + r'] "(?P.*?)" (?P\d+) (?P\d+) "(?P.*?)" "(?P.*?)"') -time_pat= re.compile(r'[(\d\.)]+ - - \[' + time_regex + '\] ') +format_pat= re.compile(r'(?P[a-f\d\.:]+) - - \['+ time_regex + r'] "(?P.*?)" (?P\d+) (?P\d+) "(?P.*?)" "(?P.*?)"') +time_pat= re.compile(r'[a-f\d:\.]+ - - \[' + time_regex + '\] ') logtime_pat = "%d/%b/%Y:%H:%M:%S %z" @@ -84,12 +87,16 @@ class LogEntry: if qp[0] == 'OPTIONS': self.request = None else: - if qp[1].startswith('/search'): + if '/?' in qp[1]: self.request = 'S' - elif qp[1].startswith('/reverse'): + elif '/search' in qp[1]: + self.request = 'S' + elif '/reverse' in qp[1]: self.request = 'R' - elif qp[1].startswith('/details'): + elif '/details' in qp[1]: self.request = 'D' + elif '/lookup' in qp[1]: + self.request = 'L' else: self.request = None self.query = e['query'] @@ -124,28 +131,72 @@ class LogFile: return LogEntry.get_log_time(l) if l is not None else None def seek_to_date(self, target): - date1 = self.seek_next(0) - if date1 > target: + # start position for binary search + fromseek = 0 + fromdate = self.seek_next(0) + if fromdate > target: + return True + # end position for binary search + toseek = -100 + while -toseek < self.len: + todate = self.seek_next(self.len + toseek) + if todate is not None: + break + toseek -= 100 + if todate is None or todate < target: return False - curseek = 2*1024*1024 - curdate = self.seek_next(curseek) - if curdate is None: - raise RuntimeError("Cannot seek to %d" % curseek) - while target < curdate or (target - curdate).total_seconds() > 1: - bps = curseek / ((curdate - date1).total_seconds()) - curseek += (target - curdate).total_seconds() * bps - if curseek < 0: - curseek = 0 - elif curseek > self.len: - curseek = self.len - bps - curdate = self.seek_next(curseek) - if curdate is None: - raise RuntimeError("Cannot see to %d" % curseek) - return True + toseek = self.len + toseek + + + while True: + bps = (toseek - fromseek) / (todate - fromdate).total_seconds() + newseek = fromseek + int((target - fromdate).total_seconds() * bps) + newdate = self.seek_next(newseek) + if newdate is None: + return False; + error = abs((target - newdate).total_seconds()) + if error < 1: + return True + if newdate > target: + toseek = newseek + todate = newdate + oldfromseek = fromseek + fromseek = toseek - error * bps + while True: + if fromseek <= oldfromseek: + fromseek = oldfromseek + fromdate = self.seek_next(fromseek) + break + fromdate = self.seek_next(fromseek) + if fromdate < target: + break; + bps *=2 + fromseek -= error * bps + else: + fromseek = newseek + fromdate = newdate + oldtoseek = toseek + toseek = fromseek + error * bps + while True: + if toseek > oldtoseek: + toseek = oldtoseek + todate = self.seek_next(toseek) + break + todate = self.seek_next(toseek) + if todate > target: + break + bps *=2 + toseek += error * bps + if toseek - fromseek < 500: + return True + def loglines(self): for l in self.fd: - yield LogEntry(l) + try: + yield LogEntry(l) + except ValueError: + pass # ignore invalid lines class BlockList: @@ -176,6 +227,7 @@ class IPstats: self.short_api = 0 self.long_total = 0 self.long_api = 0 + self.block_total = 0 self.bad_ua = False def add_long(self, logentry): @@ -192,16 +244,22 @@ class IPstats: self.short_api += 1 self.add_long(logentry) + def add_block(self, logentry): + self.block_total += 1 + + def ignores_warnings(self): + return self.block_total > 5 + def new_state(self, was_blocked, was_bulked): if was_blocked: # deblock only if the IP has been really quiet # (properly catches the ones that simply ignore the HTTP error) - return None if self.long_total < 5 else 'block' + return None if self.long_total < 20 else 'block' if self.long_api > BLOCK_UPPER or self.short_api > BLOCK_UPPER / 3: # client totally overdoing it return 'block' if was_bulked: - if self.short_total < 5: + if self.short_total < 20: # client has stopped, debulk return None if self.long_api > BLOCK_LIMIT or self.short_api > BLOCK_LIMIT / 3: @@ -210,8 +268,8 @@ class IPstats: return 'bulk' if self.long_api > BULKLONG_LIMIT or self.short_api > BULKSHORT_LIMIT: - if self.bad_ua: - return 'uablock' # bad useragent + #if self.bad_ua: + # return 'uablock' # bad useragent return 'bulk' return None @@ -238,6 +296,7 @@ if __name__ == '__main__': bl = BlockList() shortstart = dt + BLOCKCOOLOFF_DELTA - BULKCOOLOFF_DELTA + blockstart = dt + BLOCKCOOLOFF_DELTA - BLOCKCHECK_DELTA notlogged = bl.whitelist | bl.blacklist stats = defaultdict(IPstats) @@ -254,22 +313,28 @@ if __name__ == '__main__': stats[l.ip].add_short(l) if l.request is not None and l.retcode == 200: total200 += 1 + if l.date > blockstart and l.retcode in (403, 429): + stats[l.ip].add_block(l) # adapt limits according to CPU and DB load fd = open("/proc/loadavg") cpuload = int(float(fd.readline().split()[2])) fd.close() - dbload = total200 / BULKCOOLOFF_DELTA.total_seconds() + # check the number of excess connections to apache + dbcons = int(subprocess.check_output("netstat -s | grep 'connections established' | sed 's:^\s*::;s: .*::'", shell=True)) + fpms = int(subprocess.check_output('ps -Af | grep php-fpm | wc -l', shell=True)) + dbload = max(0, dbcons - fpms) numbulks = len(bl.prevbulks) - BLOCK_LIMIT = max(BLOCK_LIMIT, BLOCK_UPPER - BLOCK_LOADFAC * (dbload - 75)) - BULKLONG_LIMIT = max(BULK_LOWER, BULKLONG_LIMIT - BULK_LOADFAC * (cpuload - 14)) + BLOCK_LIMIT = max(BLOCK_LIMIT, BLOCK_UPPER - BLOCK_LOADFAC * dbload) + BULKLONG_LIMIT = max(BULK_LOWER, BULKLONG_LIMIT - BULK_LOADFAC * cpuload) if numbulks > MAX_BULK_IPS: BLOCK_LIMIT = max(3600, BLOCK_LOWER - (numbulks - MAX_BULK_IPS)*10) # if the bulk pool is still empty, clients will be faster, avoid having # them blocked in this case if numbulks < 10: - BLOCK_LIMIT = 2*BLOCK_UPPER + BLOCK_UPPER *= 2 + BLOCK_LIMIT = BLOCK_UPPER # collecting statistics @@ -305,11 +370,11 @@ if __name__ == '__main__': elif wasbulked: debulked.append(k) for i in bl.blacklist: - fd.write("%s ban\n" % k) + fd.write("%s ban\n" % i) fd.close() # TODO write logs (need to collect some statistics) - logstr = datetime.now().strftime('%Y-%m-%d %H:%M') + ' %s %s\n' + logstr = datetime.now().strftime('%d/%b/%Y:%H:%M:%S') + ' %s %s\n' fd = open(LOGFILE, 'a') if unblocked: fd.write(logstr % ('unblocked:', ', '.join(unblocked))) @@ -323,4 +388,7 @@ if __name__ == '__main__': fd.write(logstr % (' ua block:', ', '.join(uablocked))) if blocked: fd.write(logstr % ('new block:', ', '.join(blocked))) + for k,v in stats.items(): + if v.ignores_warnings() and k not in notlogged and ':' not in k: + fd.write(logstr % ('Warning ignored:', k)) fd.close()