summaryrefslogtreecommitdiff
path: root/banapedia/Ban.py
blob: d8666b44e98328322bc79555dc448bd269cac883 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from banapedia.wapi.WikipediaQuery import BlockQuery
from datetime import datetime
import pygeoip

__author__ = 'pacien'


GEOIP_FILE = "/usr/share/GeoIP/GeoIP.dat"
geoip = pygeoip.GeoIP(GEOIP_FILE)

ISO_TIMESTAMP = "%Y-%m-%dT%H:%M:%SZ"


class Ban:
    def __init__(self, ip, start, end):
        self.ip = ip
        self.start = start
        self.end = end
        self.country_code = None

    def get_duration(self):
        return (self.end - self.start).days

    def get_country_code(self):
        if self.country_code is not None:
            return self.country_code

        country_code = ""

        try:
            country_code = geoip.country_code_by_addr(self.ip).lower()
        except pygeoip.GeoIPError:
            print("[ERROR]", "Could not determine country for ip", self.ip)

        self.country_code = country_code
        return country_code


def map_ban(ban_dict):
    return Ban(
        ban_dict["user"],
        datetime.strptime(ban_dict["timestamp"], ISO_TIMESTAMP),
        datetime.strptime(ban_dict["expiry"], ISO_TIMESTAMP),
    )


def map_bans(ban_dict_list):
    ban_list = []
    for ban_dict in ban_dict_list:
        ban_list.append(map_ban(ban_dict))

    return ban_list


def fetch_multipart_ban_dict(n, query_limit):
    ban_dict_list = []
    n_fetched = 0
    continue_token = None

    print("[INFO]", "Fetching %d bans" % n)
    while n_fetched < n:
        to_fetch = min(query_limit, n - n_fetched)
        query = BlockQuery(
            bkprop=["user", "timestamp", "expiry"],
            bkshow=["temp", "ip"],
            limit=to_fetch,
            continue_token=continue_token,
        )
        results = query.fetch_result()
        ban_dict_list.extend(results["query"]["blocks"])
        continue_token = results["query-continue"]["blocks"]["bkcontinue"]
        n_fetched += to_fetch
        print("[INFO]", "Fetched %d over %d bans" % (n_fetched, n))

    print("[INFO]", "Bans fetching complete")
    return ban_dict_list