Package cloudfiles :: Module connection
[frames] | no frames]

Source Code for Module cloudfiles.connection

  1  """ 
  2  connection operations 
  3   
  4  Connection instances are used to communicate with the remote service at 
  5  the account level creating, listing and deleting Containers, and returning 
  6  Container instances. 
  7   
  8  See COPYING for license information. 
  9  """ 
 10   
 11  import  socket 
 12  import  os 
 13  from    urllib    import quote 
 14  from    httplib   import HTTPSConnection, HTTPConnection, HTTPException 
 15  from    container import Container, ContainerResults 
 16  from    utils     import parse_url, THTTPConnection, THTTPSConnection 
 17  from    errors    import ResponseError, NoSuchContainer, ContainerNotEmpty, \ 
 18                           InvalidContainerName, CDNNotEnabled 
 19  from    Queue     import Queue, Empty, Full 
 20  from    time      import time 
 21  import  consts 
 22  from    authentication import Authentication 
 23  from    fjson     import json_loads 
 24  from    sys       import version_info 
 25  # Because HTTPResponse objects *have* to have read() called on them 
 26  # before they can be used again ... 
 27  # pylint: disable-msg=W0612 
 28   
 29   
30 -class Connection(object):
31 """ 32 Manages the connection to the storage system and serves as a factory 33 for Container instances. 34 35 @undocumented: cdn_connect 36 @undocumented: http_connect 37 @undocumented: cdn_request 38 @undocumented: make_request 39 @undocumented: _check_container_name 40 """ 41
42 - def __init__(self, username=None, api_key=None, timeout=5, **kwargs):
43 """ 44 Accepts keyword arguments for Mosso username and api key. 45 Optionally, you can omit these keywords and supply an 46 Authentication object using the auth keyword. Setting the argument 47 servicenet to True will make use of Rackspace servicenet network. 48 49 @type username: str 50 @param username: a Mosso username 51 @type api_key: str 52 @param api_key: a Mosso API key 53 @type servicenet: bool 54 @param servicenet: Use Rackspace servicenet to access Cloud Files. 55 @type cdn_log_retention: bool 56 @param cdn_log_retention: set logs retention for this cdn enabled 57 container. 58 """ 59 self.cdn_enabled = False 60 self.cdn_args = None 61 self.connection_args = None 62 self.cdn_connection = None 63 self.connection = None 64 self.token = None 65 self.debuglevel = int(kwargs.get('debuglevel', 0)) 66 self.servicenet = kwargs.get('servicenet', False) 67 self.user_agent = kwargs.get('useragent', consts.user_agent) 68 self.timeout = timeout 69 70 # if the environement variable RACKSPACE_SERVICENET is set (to 71 # anything) it will automatically set servicenet=True 72 if not 'servicenet' in kwargs \ 73 and 'RACKSPACE_SERVICENET' in os.environ: 74 self.servicenet = True 75 76 self.auth = 'auth' in kwargs and kwargs['auth'] or None 77 78 if not self.auth: 79 authurl = kwargs.get('authurl', consts.us_authurl) 80 if username and api_key and authurl: 81 self.auth = Authentication(username, api_key, authurl=authurl, 82 useragent=self.user_agent) 83 else: 84 raise TypeError("Incorrect or invalid arguments supplied") 85 86 self._authenticate()
87
88 - def _authenticate(self):
89 """ 90 Authenticate and setup this instance with the values returned. 91 """ 92 (url, self.cdn_url, self.token) = self.auth.authenticate() 93 url = self._set_storage_url(url) 94 self.connection_args = parse_url(url) 95 96 if version_info[0] <= 2 and version_info[1] < 6: 97 self.conn_class = self.connection_args[3] and THTTPSConnection or \ 98 THTTPConnection 99 else: 100 self.conn_class = self.connection_args[3] and HTTPSConnection or \ 101 HTTPConnection 102 self.http_connect() 103 if self.cdn_url: 104 self.cdn_connect()
105
106 - def _set_storage_url(self, url):
107 if self.servicenet: 108 return "https://snet-%s" % url.replace("https://", "") 109 return url
110
111 - def cdn_connect(self):
112 """ 113 Setup the http connection instance for the CDN service. 114 """ 115 (host, port, cdn_uri, is_ssl) = parse_url(self.cdn_url) 116 self.cdn_connection = self.conn_class(host, port, timeout=self.timeout) 117 self.cdn_enabled = True
118
119 - def http_connect(self):
120 """ 121 Setup the http connection instance. 122 """ 123 (host, port, self.uri, is_ssl) = self.connection_args 124 self.connection = self.conn_class(host, port=port, \ 125 timeout=self.timeout) 126 self.connection.set_debuglevel(self.debuglevel)
127
128 - def cdn_request(self, method, path=[], data='', hdrs=None):
129 """ 130 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and 131 metadata dicts, performs an http request against the CDN service. 132 """ 133 if not self.cdn_enabled: 134 raise CDNNotEnabled() 135 136 path = '/%s/%s' % \ 137 (self.uri.rstrip('/'), '/'.join([quote(i) for i in path])) 138 headers = {'Content-Length': str(len(data)), 139 'User-Agent': self.user_agent, 140 'X-Auth-Token': self.token} 141 if isinstance(hdrs, dict): 142 headers.update(hdrs) 143 144 def retry_request(): 145 '''Re-connect and re-try a failed request once''' 146 self.cdn_connect() 147 self.cdn_connection.request(method, path, data, headers) 148 return self.cdn_connection.getresponse()
149 150 try: 151 self.cdn_connection.request(method, path, data, headers) 152 response = self.cdn_connection.getresponse() 153 except (socket.error, IOError, HTTPException): 154 response = retry_request() 155 if response.status == 401: 156 self._authenticate() 157 headers['X-Auth-Token'] = self.token 158 response = retry_request() 159 160 return response
161
162 - def make_request(self, method, path=[], data='', hdrs=None, parms=None):
163 """ 164 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and 165 metadata dicts, and an optional dictionary of query parameters, 166 performs an http request. 167 """ 168 path = '/%s/%s' % \ 169 (self.uri.rstrip('/'), '/'.join([quote(i) for i in path])) 170 171 if isinstance(parms, dict) and parms: 172 query_args = \ 173 ['%s=%s' % (quote(x), 174 quote(str(y))) for (x, y) in parms.items()] 175 path = '%s?%s' % (path, '&'.join(query_args)) 176 177 headers = {'Content-Length': str(len(data)), 178 'User-Agent': self.user_agent, 179 'X-Auth-Token': self.token} 180 isinstance(hdrs, dict) and headers.update(hdrs) 181 182 def retry_request(): 183 '''Re-connect and re-try a failed request once''' 184 self.http_connect() 185 self.connection.request(method, path, data, headers) 186 return self.connection.getresponse()
187 188 try: 189 self.connection.request(method, path, data, headers) 190 response = self.connection.getresponse() 191 except (socket.error, IOError, HTTPException): 192 response = retry_request() 193 if response.status == 401: 194 self._authenticate() 195 headers['X-Auth-Token'] = self.token 196 response = retry_request() 197 198 return response 199
200 - def get_info(self):
201 """ 202 Return tuple for number of containers and total bytes in the account 203 204 >>> connection.get_info() 205 (5, 2309749) 206 207 @rtype: tuple 208 @return: a tuple containing the number of containers and total bytes 209 used by the account 210 """ 211 response = self.make_request('HEAD') 212 count = size = None 213 for hdr in response.getheaders(): 214 if hdr[0].lower() == 'x-account-container-count': 215 try: 216 count = int(hdr[1]) 217 except ValueError: 218 count = 0 219 if hdr[0].lower() == 'x-account-bytes-used': 220 try: 221 size = int(hdr[1]) 222 except ValueError: 223 size = 0 224 buff = response.read() 225 if (response.status < 200) or (response.status > 299): 226 raise ResponseError(response.status, response.reason) 227 return (count, size)
228
229 - def _check_container_name(self, container_name):
230 if not container_name or \ 231 '/' in container_name or \ 232 len(container_name) > consts.container_name_limit: 233 raise InvalidContainerName(container_name)
234
235 - def create_container(self, container_name):
236 """ 237 Given a container name, returns a L{Container} item, creating a new 238 Container if one does not already exist. 239 240 >>> connection.create_container('new_container') 241 <cloudfiles.container.Container object at 0xb77d628c> 242 243 @param container_name: name of the container to create 244 @type container_name: str 245 @rtype: L{Container} 246 @return: an object representing the newly created container 247 """ 248 self._check_container_name(container_name) 249 250 response = self.make_request('PUT', [container_name]) 251 buff = response.read() 252 if (response.status < 200) or (response.status > 299): 253 raise ResponseError(response.status, response.reason) 254 return Container(self, container_name)
255
256 - def delete_container(self, container_name):
257 """ 258 Given a container name, delete it. 259 260 >>> connection.delete_container('old_container') 261 262 @param container_name: name of the container to delete 263 @type container_name: str 264 """ 265 if isinstance(container_name, Container): 266 container_name = container_name.name 267 self._check_container_name(container_name) 268 269 response = self.make_request('DELETE', [container_name]) 270 buff = response.read() 271 272 if (response.status == 409): 273 raise ContainerNotEmpty(container_name) 274 elif (response.status < 200) or (response.status > 299): 275 raise ResponseError(response.status, response.reason) 276 277 if self.cdn_enabled: 278 response = self.cdn_request('POST', [container_name], 279 hdrs={'X-CDN-Enabled': 'False'})
280
281 - def get_all_containers(self, limit=None, marker=None, **parms):
282 """ 283 Returns a Container item result set. 284 285 >>> connection.get_all_containers() 286 ContainerResults: 4 containers 287 >>> print ', '.join([container.name for container in 288 connection.get_all_containers()]) 289 new_container, old_container, pictures, music 290 291 @rtype: L{ContainerResults} 292 @return: an iterable set of objects representing all containers on the 293 account 294 @param limit: number of results to return, up to 10,000 295 @type limit: int 296 @param marker: return only results whose name is greater than "marker" 297 @type marker: str 298 """ 299 if limit: 300 parms['limit'] = limit 301 if marker: 302 parms['marker'] = marker 303 return ContainerResults(self, self.list_containers_info(**parms))
304
305 - def get_container(self, container_name):
306 """ 307 Return a single Container item for the given Container. 308 309 >>> connection.get_container('old_container') 310 <cloudfiles.container.Container object at 0xb77d628c> 311 >>> container = connection.get_container('old_container') 312 >>> container.size_used 313 23074 314 315 @param container_name: name of the container to create 316 @type container_name: str 317 @rtype: L{Container} 318 @return: an object representing the container 319 """ 320 self._check_container_name(container_name) 321 322 response = self.make_request('HEAD', [container_name]) 323 count = size = None 324 for hdr in response.getheaders(): 325 if hdr[0].lower() == 'x-container-object-count': 326 try: 327 count = int(hdr[1]) 328 except ValueError: 329 count = 0 330 if hdr[0].lower() == 'x-container-bytes-used': 331 try: 332 size = int(hdr[1]) 333 except ValueError: 334 size = 0 335 buff = response.read() 336 if response.status == 404: 337 raise NoSuchContainer(container_name) 338 if (response.status < 200) or (response.status > 299): 339 raise ResponseError(response.status, response.reason) 340 return Container(self, container_name, count, size)
341
342 - def list_public_containers(self):
343 """ 344 Returns a list of containers that have been published to the CDN. 345 346 >>> connection.list_public_containers() 347 ['container1', 'container2', 'container3'] 348 349 @rtype: list(str) 350 @return: a list of all CDN-enabled container names as strings 351 """ 352 response = self.cdn_request('GET', ['']) 353 if (response.status < 200) or (response.status > 299): 354 buff = response.read() 355 raise ResponseError(response.status, response.reason) 356 return response.read().splitlines()
357
358 - def list_containers_info(self, limit=None, marker=None, **parms):
359 """ 360 Returns a list of Containers, including object count and size. 361 362 >>> connection.list_containers_info() 363 [{u'count': 510, u'bytes': 2081717, u'name': u'new_container'}, 364 {u'count': 12, u'bytes': 23074, u'name': u'old_container'}, 365 {u'count': 0, u'bytes': 0, u'name': u'container1'}, 366 {u'count': 0, u'bytes': 0, u'name': u'container2'}, 367 {u'count': 0, u'bytes': 0, u'name': u'container3'}, 368 {u'count': 3, u'bytes': 2306, u'name': u'test'}] 369 370 @rtype: list({"name":"...", "count":..., "bytes":...}) 371 @return: a list of all container info as dictionaries with the 372 keys "name", "count", and "bytes" 373 @param limit: number of results to return, up to 10,000 374 @type limit: int 375 @param marker: return only results whose name is greater than "marker" 376 @type marker: str 377 """ 378 if limit: 379 parms['limit'] = limit 380 if marker: 381 parms['marker'] = marker 382 parms['format'] = 'json' 383 response = self.make_request('GET', [''], parms=parms) 384 if (response.status < 200) or (response.status > 299): 385 buff = response.read() 386 raise ResponseError(response.status, response.reason) 387 return json_loads(response.read())
388
389 - def list_containers(self, limit=None, marker=None, **parms):
390 """ 391 Returns a list of Containers. 392 393 >>> connection.list_containers() 394 ['new_container', 395 'old_container', 396 'container1', 397 'container2', 398 'container3', 399 'test'] 400 401 @rtype: list(str) 402 @return: a list of all containers names as strings 403 @param limit: number of results to return, up to 10,000 404 @type limit: int 405 @param marker: return only results whose name is greater than "marker" 406 @type marker: str 407 """ 408 if limit: 409 parms['limit'] = limit 410 if marker: 411 parms['marker'] = marker 412 response = self.make_request('GET', [''], parms=parms) 413 if (response.status < 200) or (response.status > 299): 414 buff = response.read() 415 raise ResponseError(response.status, response.reason) 416 return response.read().splitlines()
417
418 - def __getitem__(self, key):
419 """ 420 Container objects can be grabbed from a connection using index 421 syntax. 422 423 >>> container = conn['old_container'] 424 >>> container.size_used 425 23074 426 427 @rtype: L{Container} 428 @return: an object representing the container 429 """ 430 return self.get_container(key)
431 432
433 -class ConnectionPool(Queue):
434 """ 435 A thread-safe connection pool object. 436 437 This component isn't required when using the cloudfiles library, but it may 438 be useful when building threaded applications. 439 """ 440
441 - def __init__(self, username=None, api_key=None, **kwargs):
442 auth = kwargs.get('auth', None) 443 self.timeout = kwargs.get('timeout', 5) 444 self.connargs = {'username': username, 'api_key': api_key} 445 poolsize = kwargs.get('poolsize', 10) 446 Queue.__init__(self, poolsize)
447
448 - def get(self):
449 """ 450 Return a cloudfiles connection object. 451 452 @rtype: L{Connection} 453 @return: a cloudfiles connection object 454 """ 455 try: 456 (create, connobj) = Queue.get(self, block=0) 457 except Empty: 458 connobj = Connection(**self.connargs) 459 return connobj
460
461 - def put(self, connobj):
462 """ 463 Place a cloudfiles connection object back into the pool. 464 465 @param connobj: a cloudfiles connection object 466 @type connobj: L{Connection} 467 """ 468 try: 469 Queue.put(self, (time(), connobj), block=0) 470 except Full: 471 del connobj
472 # vim:set ai sw=4 ts=4 tw=0 expandtab: 473