]> git.openstreetmap.org Git - osqa.git/blob - forum/models/utils.py
Fixes a possible future bug in edit user.
[osqa.git] / forum / models / utils.py
1 from django.db import models
2 from django.core.cache import cache
3 from django.conf import settings
4 from django.utils.encoding import force_unicode
5
6 try:
7     from cPickle import loads, dumps
8 except ImportError:
9     from pickle import loads, dumps
10
11 from copy import deepcopy
12 from base64 import b64encode, b64decode
13 from zlib import compress, decompress
14 import re
15
16 from base import BaseModel
17
18 MAX_MARKABLE_STRING_LENGTH = 100
19
20 class PickledObject(str):
21     pass
22
23 def dbsafe_encode(value, compress_object=True):
24     if not compress_object:
25         value = b64encode(dumps(deepcopy(value)))
26     else:
27         value = b64encode(compress(dumps(deepcopy(value))))
28     return PickledObject(value)
29
30 def dbsafe_decode(value, compress_object=True):
31     if not compress_object:
32         value = loads(b64decode(value))
33     else:
34         value = loads(decompress(b64decode(value)))
35     return value
36
37 class PickledObjectField(models.Field):
38     __metaclass__ = models.SubfieldBase
39
40     marker_re = re.compile(r'^T\[(?P<type>\w+)\](?P<value>.*)$', re.DOTALL)
41     markable_types = dict((t.__name__, t) for t in (str, int, unicode))
42
43     def __init__(self, *args, **kwargs):
44         self.compress = kwargs.pop('compress', True)
45         self.protocol = kwargs.pop('protocol', 2)
46         kwargs.setdefault('null', True)
47         kwargs.setdefault('editable', False)
48         super(PickledObjectField, self).__init__(*args, **kwargs)
49
50     def generate_type_marked_value(self, value):
51         return PickledObject("T[%s]%s" % (type(value).__name__, value))
52
53     def read_marked_value(self, value):
54         m = self.marker_re.match(value)
55
56         if m:
57             marker = m.group('type')
58             value = m.group('value')
59             if marker in self.markable_types:
60                 value = self.markable_types[marker](value)
61
62         return value
63
64     def get_default(self):
65         if self.has_default():
66             if callable(self.default):
67                 return self.default()
68             return self.default
69
70         return super(PickledObjectField, self).get_default()
71
72     def to_python(self, value):
73         if value is not None:
74             try:
75                 if value.startswith("T["):
76                     value = self.read_marked_value(value)
77                 else:
78                     value = dbsafe_decode(value, self.compress)
79             except:
80                 if isinstance(value, PickledObject):
81                     raise
82         return value
83
84     def get_db_prep_value(self, value):
85         if value is not None and not isinstance(value, PickledObject):
86             if type(value).__name__ in self.markable_types and not (isinstance(value, basestring) and len(value) > MAX_MARKABLE_STRING_LENGTH):
87                 value = force_unicode(self.generate_type_marked_value(value))
88             else:
89                 value = force_unicode(dbsafe_encode(value, self.compress))
90         return value
91
92     def value_to_string(self, obj):
93         value = self._get_val_from_obj(obj)
94         return self.get_db_prep_value(value)
95
96     def get_internal_type(self):
97         return 'TextField'
98
99     def get_db_prep_lookup(self, lookup_type, value):
100         if lookup_type not in ['exact', 'in', 'isnull']:
101             raise TypeError('Lookup type %s is not supported.' % lookup_type)
102         return super(PickledObjectField, self).get_db_prep_lookup(lookup_type, value)
103
104
105 class KeyValue(BaseModel):
106     key = models.CharField(max_length=255, unique=True)
107     value = PickledObjectField()
108
109     class Meta:
110         app_label = 'forum'
111
112     def cache_key(self):
113         return self._generate_cache_key(self.key)
114
115     @classmethod
116     def infer_cache_key(cls, querydict):
117         try:
118             key = [v for (k,v) in querydict.items() if k in ('key', 'key__exact')][0]
119
120             return cls._generate_cache_key(key)
121         except:
122             return None
123