Source code for wheezy.core.httpclient

"""
"""

from wheezy.core.collections import attrdict
from wheezy.core.collections import defaultdict
from wheezy.core.comp import HTTPConnection
from wheezy.core.comp import HTTPSConnection
from wheezy.core.comp import SimpleCookie
from wheezy.core.comp import json_loads
from wheezy.core.comp import urlencode
from wheezy.core.comp import urljoin
from wheezy.core.comp import urlsplit
from wheezy.core.gzip import decompress


[docs]class HTTPClient(object): """ HTTP client sends HTTP requests to server in order to accomplish an application specific use cases, e.g. remote web server API, etc. """ def __init__(self, url, headers=None): """ `url` - a base url for interaction with remote server. `headers` - a dictionary of headers. """ scheme, netloc, path, query, fragment = urlsplit(url) http_class = scheme == 'http' and HTTPConnection or HTTPSConnection self.connection = http_class(netloc) self.default_headers = { 'Accept-Encoding': 'gzip', 'Connection': 'close' } if headers: self.default_headers.update(headers) self.path = path self.method = None self.headers = None self.cookies = {} self.etags = {} self.status_code = 0 self.body = None self.__content = None self.__json = None @property
[docs] def content(self): """ Returns a content of the response. """ if self.__content is None: self.__content = self.body.decode('utf-8') return self.__content
@property
[docs] def json(self): """ Returns a json response. """ if self.__json is None: assert 'application/json' in self.headers['content-type'][0] self.__json = json_loads(self.content, object_hook=attrdict) return self.__json
[docs] def get(self, path, **kwargs): """ Sends GET HTTP request. """ return self.go(path, 'GET', **kwargs)
[docs] def ajax_get(self, path, **kwargs): """ Sends GET HTTP AJAX request. """ return self.ajax_go(path, 'GET', **kwargs)
[docs] def head(self, path, **kwargs): """ Sends HEAD HTTP request. """ return self.go(path, 'HEAD', **kwargs)
[docs] def post(self, path, **kwargs): """ Sends POST HTTP request. """ return self.go(path, 'POST', **kwargs)
[docs] def ajax_post(self, path, **kwargs): """ Sends POST HTTP AJAX request. """ return self.ajax_go(path, 'POST', **kwargs)
[docs] def follow(self): """ Follows HTTP redirect (e.g. status code 302). """ sc = self.status_code assert sc in [207, 301, 302, 303, 307] location = self.headers['location'][0] scheme, netloc, path, query, fragment = urlsplit(location) method = sc == 307 and self.method or 'GET' return self.go(path, method)
[docs] def ajax_go(self, path=None, method='GET', params=None, headers=None, content_type='', body=''): """ Sends HTTP AJAX request to web server. """ headers = headers or {} headers['X-Requested-With'] = 'XMLHttpRequest' return self.go(path, method, params, headers, content_type, body)
[docs] def go(self, path=None, method='GET', params=None, headers=None, content_type='', body=''): """ Sends HTTP request to web server. The ``content_type`` takes priority over ``params`` to use ``body``. The ``body`` can be a string or file like object. """ self.method = method headers = headers and dict(self.default_headers, **headers) or dict(self.default_headers) if self.cookies: headers['Cookie'] = '; '.join( '%s=%s' % cookie for cookie in self.cookies.items()) path = urljoin(self.path, path) if path in self.etags: headers['If-None-Match'] = self.etags[path] if content_type: headers['Content-Type'] = content_type elif params: if method == 'GET': path += '?' + urlencode(params, doseq=True) else: body = urlencode(params, doseq=True) headers['Content-Type'] = 'application/x-www-form-urlencoded' self.status_code = 0 self.body = None self.__content = None self.__json = None self.connection.connect() self.connection.request(method, path, body, headers) r = self.connection.getresponse() self.body = r.read() self.connection.close() self.status_code = r.status self.headers = defaultdict(list) for name, value in r.getheaders(): self.headers[name].append(value) self.process_content_encoding() self.process_etag(path) self.process_cookies() return self.status_code # region: internal details
def process_content_encoding(self): if 'content-encoding' in self.headers \ and 'gzip' in self.headers['content-encoding']: self.body = decompress(self.body) def process_etag(self, path): if 'etag' in self.headers: self.etags[path] = self.headers['etag'][-1] def process_cookies(self): if 'set-cookie' in self.headers: for cookie_string in self.headers['set-cookie']: cookies = SimpleCookie(cookie_string) for name in cookies: value = cookies[name].value if value: self.cookies[name] = value elif name in self.cookies: del self.cookies[name]