+# -*- coding: utf-8 -*-
+
import re
+from django.utils.encoding import smart_unicode
from django.utils.html import escape
-from django.http import get_host
from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
import settings
dataype2ax_schema = {
'username': 'http://axschema.org/namePerson/friendly',
'email': 'http://axschema.org/contact/email',
- 'web': 'http://axschema.org/contact/web/default',
+ #'web': 'http://axschema.org/contact/web/default',
#'firstname': 'http://axschema.org/namePerson/first',
#'lastname': 'http://axschema.org/namePerson/last',
- 'birthdate': 'http://axschema.org/birthDate',
+ #'birthdate': 'http://axschema.org/birthDate',
+ }
+
+ sreg_attributes = {
+ "required": {
+ "email": "email",
+ "nickname": "username",
+ "fullname": "real_name"
+ }
}
def get_user_url(self, request):
def prepare_authentication_request(self, request, redirect_to):
if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
- redirect_to = get_url_host(request) + redirect_to
+ redirect_to = get_url_host(request) + redirect_to
user_url = self.get_user_url(request)
except DiscoveryFailure:
raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
- #sreg = getattr(settings, 'OPENID_SREG', False)
+ sreg = getattr(self, 'sreg_attributes', False)
+
+ if sreg:
+ s = SRegRequest()
- #if sreg:
- # s = SRegRequest()
- # for sarg in sreg:
- # if sarg.lower().lstrip() == "policy_url":
- # s.policy_url = sreg[sarg]
- # else:
- # for v in sreg[sarg].split(','):
- # s.requestField(field_name=v.lower().lstrip(), required=(sarg.lower().lstrip() == "required"))
- # auth_request.addExtension(s)
+ for k, attr_dic in sreg.items():
+ if k == "policy_url":
+ s.policy_url = attr_dic
+ continue
- #auth_request.addExtension(SRegRequest(required=['email']))
+ for attr_name in attr_dic.keys():
+ s.requestField(field_name=attr_name, required=(k == "required"))
- if request.session.get('force_email_request', True):
+ auth_request.addExtension(s)
+
+ ax_schema = getattr(self, 'dataype2ax_schema', False)
+
+ if ax_schema and request.session.get('force_email_request', True):
axr = AXFetchRequest()
- for data_type, schema in self.dataype2ax_schema.items():
+ for data_type, schema in ax_schema.items():
if isinstance(schema, tuple):
- axr.add(AttrInfo(schema[0], 1, True, schema[1]))
+ axr.add(AttrInfo(schema[0], required=True, alias=schema[1]))
else:
- axr.add(AttrInfo(schema, 1, True, data_type))
+ axr.add(AttrInfo(schema, required=True, alias=data_type))
auth_request.addExtension(axr)
consumer = Consumer(request.session, OsqaOpenIDStore())
query_dict = dict([
- (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()
+ (smart_unicode(k), smart_unicode(v)) for k, v in request.GET.items()
])
#for i in query_dict.items():
openid_response = consumer.complete(query_dict, url)
if openid_response.status == SUCCESS:
- if request.session.get('force_email_request', True):
- try:
- ax = AXFetchResponse.fromSuccessResponse(openid_response)
+ consumer_data = {}
+
+ sreg_attrs = getattr(self, 'sreg_attributes', False)
+
+ if sreg_attrs:
+ sreg_response = SRegResponse.fromSuccessResponse(openid_response)
+
+ if sreg_response:
+ all_attrs = {}
+ [all_attrs.update(d) for k,d in sreg_attrs.items() if k != "policy_url"]
+
+ for attr_name, local_name in all_attrs.items():
+ if attr_name in sreg_response:
+ consumer_data[local_name] = sreg_response[attr_name]
+
+ ax_schema = getattr(self, 'dataype2ax_schema', False)
+
+ if ax_schema:
+ ax = AXFetchResponse.fromSuccessResponse(openid_response, False)
+
+ if ax:
axargs = ax.getExtensionArgs()
- ax_schema2data_type = dict([(s, t) for t, s in self.dataype2ax_schema.items()])
+ ax_schema2data_type = dict([(s, t) for t, s in ax_schema.items()])
available_types = dict([
(ax_schema2data_type[s], re.sub('^type\.', '', n))
for n, s in axargs.items() if s in ax_schema2data_type
])
- #available_data = dict([
- # (t, axargs["value.%s.1" % s]) for t, s in available_types.items()
- #])
-
- #print available_data
-
-
- #email = ax.getExtensionArgs()['value.ext0.1']
- #username = ax.getExtensionArgs()['value.ext0.2']
+ for t, s in available_types.items():
+ if not t in consumer_data:
+ if axargs.get("value.%s.1" % s, None):
+ consumer_data[t] = axargs["value.%s.1" % s]
- request.session['auth_consumer_data'] = {
- 'email': '',
- 'username': ''
- }
+ request.session['auth_consumer_data'] = consumer_data
- except Exception, e:
- import sys, traceback
- traceback.print_exc(file=sys.stdout)
return request.GET['openid.identity']
elif openid_response.status == CANCEL:
protocol = 'https'
else:
protocol = 'http'
- host = escape(get_host(request))
+ host = escape(request.get_host())
return '%s://%s' % (protocol, host)
def get_full_url(request):
- return get_url_host(request) + request.get_full_path()
\ No newline at end of file
+ return get_url_host(request) + request.get_full_path()