Package cloudfiles :: Module connection
[frames] | no frames]

Source Code for Module cloudfiles.connection

  1  """ 
  2  connection operations 
  3   
  4  Connection instances are used to communicate with the remote service at 
  5  the account level creating, listing and deleting Containers, and returning 
  6  Container instances. 
  7   
  8  See COPYING for license information. 
  9  """ 
 10   
 11  import  socket 
 12  import  os 
 13  from    urllib    import urlencode 
 14  from    httplib   import HTTPSConnection, HTTPConnection, HTTPException 
 15  from    container import Container, ContainerResults 
 16  from    utils     import unicode_quote, parse_url, THTTPConnection, THTTPSConnection 
 17  from    errors    import ResponseError, NoSuchContainer, ContainerNotEmpty, \ 
 18                           InvalidContainerName, CDNNotEnabled, ContainerExists 
 19  from    Queue     import Queue, Empty, Full 
 20  from    time      import time 
 21  import  consts 
 22  from    authentication import Authentication 
 23  from    fjson     import json_loads 
 24  from    sys       import version_info 
 25  # Because HTTPResponse objects *have* to have read() called on them 
 26  # before they can be used again ... 
 27  # pylint: disable-msg=W0612 
 28   
 29   
30 -class Connection(object):
31 """ 32 Manages the connection to the storage system and serves as a factory 33 for Container instances. 34 35 @undocumented: cdn_connect 36 @undocumented: http_connect 37 @undocumented: cdn_request 38 @undocumented: make_request 39 @undocumented: _check_container_name 40 """ 41
42 - def __init__(self, username=None, api_key=None, timeout=5, **kwargs):
43 """ 44 Accepts keyword arguments for Mosso username and api key. 45 Optionally, you can omit these keywords and supply an 46 Authentication object using the auth keyword. Setting the argument 47 servicenet to True will make use of Rackspace servicenet network. 48 49 @type username: str 50 @param username: a Mosso username 51 @type api_key: str 52 @param api_key: a Mosso API key 53 @type servicenet: bool 54 @param servicenet: Use Rackspace servicenet to access Cloud Files. 55 @type cdn_log_retention: bool 56 @param cdn_log_retention: set logs retention for this cdn enabled 57 container. 58 """ 59 self.cdn_enabled = False 60 self.cdn_args = None 61 self.connection_args = None 62 self.cdn_connection = None 63 self.connection = None 64 self.token = None 65 self.debuglevel = int(kwargs.get('debuglevel', 0)) 66 self.servicenet = kwargs.get('servicenet', False) 67 self.user_agent = kwargs.get('useragent', consts.user_agent) 68 self.timeout = timeout 69 70 # if the environement variable RACKSPACE_SERVICENET is set (to 71 # anything) it will automatically set servicenet=True 72 if not 'servicenet' in kwargs \ 73 and 'RACKSPACE_SERVICENET' in os.environ: 74 self.servicenet = True 75 76 self.auth = 'auth' in kwargs and kwargs['auth'] or None 77 78 if not self.auth: 79 authurl = kwargs.get('authurl', consts.us_authurl) 80 if username and api_key and authurl: 81 self.auth = Authentication(username, api_key, authurl=authurl, 82 useragent=self.user_agent, timeout=self.timeout) 83 else: 84 raise TypeError("Incorrect or invalid arguments supplied") 85 self._authenticate()
86 - def _authenticate(self):
87 """ 88 Authenticate and setup this instance with the values returned. 89 """ 90 (url, self.cdn_url, self.token) = self.auth.authenticate() 91 url = self._set_storage_url(url) 92 self.connection_args = parse_url(url) 93 94 if version_info[0] <= 2 and version_info[1] < 6: 95 self.conn_class = self.connection_args[3] and THTTPSConnection or \ 96 THTTPConnection 97 else: 98 self.conn_class = self.connection_args[3] and HTTPSConnection or \ 99 HTTPConnection 100 self.http_connect() 101 if self.cdn_url: 102 self.cdn_connect()
103
104 - def _set_storage_url(self, url):
105 if self.servicenet: 106 return "https://snet-%s" % url.replace("https://", "") 107 return url
108
109 - def cdn_connect(self):
110 """ 111 Setup the http connection instance for the CDN service. 112 """ 113 (host, port, cdn_uri, is_ssl) = parse_url(self.cdn_url) 114 self.cdn_connection = self.conn_class(host, port, timeout=self.timeout) 115 self.cdn_enabled = True
116
117 - def http_connect(self):
118 """ 119 Setup the http connection instance. 120 """ 121 (host, port, self.uri, is_ssl) = self.connection_args 122 self.connection = self.conn_class(host, port=port, \ 123 timeout=self.timeout) 124 self.connection.set_debuglevel(self.debuglevel)
125
126 - def cdn_request(self, method, path=[], data='', hdrs=None):
127 """ 128 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and 129 metadata dicts, performs an http request against the CDN service. 130 """ 131 if not self.cdn_enabled: 132 raise CDNNotEnabled() 133 134 path = '/%s/%s' % \ 135 (self.uri.rstrip('/'), '/'.join([unicode_quote(i) for i in path])) 136 headers = {'Content-Length': str(len(data)), 137 'User-Agent': self.user_agent, 138 'X-Auth-Token': self.token} 139 if isinstance(hdrs, dict): 140 headers.update(hdrs) 141 142 def retry_request(): 143 '''Re-connect and re-try a failed request once''' 144 self.cdn_connect() 145 self.cdn_connection.request(method, path, data, headers) 146 return self.cdn_connection.getresponse()
147 148 try: 149 self.cdn_connection.request(method, path, data, headers) 150 response = self.cdn_connection.getresponse() 151 except (socket.error, IOError, HTTPException): 152 response = retry_request() 153 if response.status == 401: 154 self._authenticate() 155 headers['X-Auth-Token'] = self.token 156 response = retry_request() 157 158 return response
159
160 - def make_request(self, method, path=[], data='', hdrs=None, parms=None):
161 """ 162 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and 163 metadata dicts, and an optional dictionary of query parameters, 164 performs an http request. 165 """ 166 path = '/%s/%s' % \ 167 (self.uri.rstrip('/'), '/'.join([unicode_quote(i) for i in path])) 168 169 if isinstance(parms, dict) and parms: 170 path = '%s?%s' % (path, urlencode(parms)) 171 172 headers = {'Content-Length': str(len(data)), 173 'User-Agent': self.user_agent, 174 'X-Auth-Token': self.token} 175 isinstance(hdrs, dict) and headers.update(hdrs) 176 177 def retry_request(): 178 '''Re-connect and re-try a failed request once''' 179 self.http_connect() 180 self.connection.request(method, path, data, headers) 181 return self.connection.getresponse()
182 183 try: 184 self.connection.request(method, path, data, headers) 185 response = self.connection.getresponse() 186 except (socket.error, IOError, HTTPException): 187 response = retry_request() 188 if response.status == 401: 189 self._authenticate() 190 headers['X-Auth-Token'] = self.token 191 response = retry_request() 192 193 return response 194
195 - def get_info(self):
196 """ 197 Return tuple for number of containers, total bytes in the account and account metadata 198 199 >>> connection.get_info() 200 (5, 2309749) 201 202 @rtype: tuple 203 @return: a tuple containing the number of containers, total bytes 204 used by the account and a dictionary containing account metadata 205 """ 206 response = self.make_request('HEAD') 207 count = size = None 208 metadata = {} 209 for hdr in response.getheaders(): 210 if hdr[0].lower() == 'x-account-container-count': 211 try: 212 count = int(hdr[1]) 213 except ValueError: 214 count = 0 215 if hdr[0].lower() == 'x-account-bytes-used': 216 try: 217 size = int(hdr[1]) 218 except ValueError: 219 size = 0 220 if hdr[0].lower().startswith('x-account-meta-'): 221 metadata[hdr[0].lower()[15:]] = hdr[1] 222 buff = response.read() 223 if (response.status < 200) or (response.status > 299): 224 raise ResponseError(response.status, response.reason) 225 return (count, size, metadata)
226
227 - def update_account_metadata(self, metadata):
228 """ 229 Update account metadata 230 >>> metadata = {'x-account-meta-foo' : 'bar'} 231 >>> connection.update_account_metadata(metadata) 232 233 @param metadata: Dictionary of metadata 234 @type metdada: dict 235 """ 236 response = self.make_request('POST', hdrs=metadata) 237 response.read() 238 if (response.status < 200) or (response.status > 299): 239 raise ResponseError(response.status, response.reason)
240
241 - def _check_container_name(self, container_name):
242 if not container_name or \ 243 '/' in container_name or \ 244 len(container_name) > consts.container_name_limit: 245 raise InvalidContainerName(container_name)
246
247 - def create_container(self, container_name, error_on_existing=False):
248 """ 249 Given a container name, returns a L{Container} item, creating a new 250 Container if one does not already exist. 251 252 >>> connection.create_container('new_container') 253 <cloudfiles.container.Container object at 0xb77d628c> 254 255 @param container_name: name of the container to create 256 @type container_name: str 257 @param error_on_existing: raise ContainerExists if container already 258 exists 259 @type error_on_existing: bool 260 @rtype: L{Container} 261 @return: an object representing the newly created container 262 """ 263 self._check_container_name(container_name) 264 265 response = self.make_request('PUT', [container_name]) 266 buff = response.read() 267 if (response.status < 200) or (response.status > 299): 268 raise ResponseError(response.status, response.reason) 269 if error_on_existing and (response.status == 202): 270 raise ContainerExists(container_name) 271 return Container(self, container_name)
272
273 - def delete_container(self, container_name):
274 """ 275 Given a container name, delete it. 276 277 >>> connection.delete_container('old_container') 278 279 @param container_name: name of the container to delete 280 @type container_name: str 281 """ 282 if isinstance(container_name, Container): 283 container_name = container_name.name 284 self._check_container_name(container_name) 285 286 response = self.make_request('DELETE', [container_name]) 287 response.read() 288 289 if (response.status == 409): 290 raise ContainerNotEmpty(container_name) 291 elif (response.status == 404): 292 raise NoSuchContainer 293 elif (response.status < 200) or (response.status > 299): 294 raise ResponseError(response.status, response.reason) 295 296 if self.cdn_enabled: 297 response = self.cdn_request('POST', [container_name], 298 hdrs={'X-CDN-Enabled': 'False'})
299
300 - def get_all_containers(self, limit=None, marker=None, **parms):
301 """ 302 Returns a Container item result set. 303 304 >>> connection.get_all_containers() 305 ContainerResults: 4 containers 306 >>> print ', '.join([container.name for container in 307 connection.get_all_containers()]) 308 new_container, old_container, pictures, music 309 310 @rtype: L{ContainerResults} 311 @return: an iterable set of objects representing all containers on the 312 account 313 @param limit: number of results to return, up to 10,000 314 @type limit: int 315 @param marker: return only results whose name is greater than "marker" 316 @type marker: str 317 """ 318 if limit: 319 parms['limit'] = limit 320 if marker: 321 parms['marker'] = marker 322 return ContainerResults(self, self.list_containers_info(**parms))
323
324 - def get_container(self, container_name):
325 """ 326 Return a single Container item for the given Container. 327 328 >>> connection.get_container('old_container') 329 <cloudfiles.container.Container object at 0xb77d628c> 330 >>> container = connection.get_container('old_container') 331 >>> container.size_used 332 23074 333 334 @param container_name: name of the container to create 335 @type container_name: str 336 @rtype: L{Container} 337 @return: an object representing the container 338 """ 339 self._check_container_name(container_name) 340 341 response = self.make_request('HEAD', [container_name]) 342 count = size = None 343 metadata = {} 344 for hdr in response.getheaders(): 345 if hdr[0].lower() == 'x-container-object-count': 346 try: 347 count = int(hdr[1]) 348 except ValueError: 349 count = 0 350 if hdr[0].lower() == 'x-container-bytes-used': 351 try: 352 size = int(hdr[1]) 353 except ValueError: 354 size = 0 355 if hdr[0].lower().startswith('x-container-meta-'): 356 metadata[hdr[0].lower()[17:]] = hdr[1] 357 buff = response.read() 358 if response.status == 404: 359 raise NoSuchContainer(container_name) 360 if (response.status < 200) or (response.status > 299): 361 raise ResponseError(response.status, response.reason) 362 return Container(self, container_name, count, size, metadata)
363
364 - def list_public_containers(self):
365 """ 366 Returns a list of containers that have been published to the CDN. 367 368 >>> connection.list_public_containers() 369 ['container1', 'container2', 'container3'] 370 371 @rtype: list(str) 372 @return: a list of all CDN-enabled container names as strings 373 """ 374 response = self.cdn_request('GET', ['']) 375 if (response.status < 200) or (response.status > 299): 376 buff = response.read() 377 raise ResponseError(response.status, response.reason) 378 return response.read().splitlines()
379
380 - def list_containers_info(self, limit=None, marker=None, **parms):
381 """ 382 Returns a list of Containers, including object count and size. 383 384 >>> connection.list_containers_info() 385 [{u'count': 510, u'bytes': 2081717, u'name': u'new_container'}, 386 {u'count': 12, u'bytes': 23074, u'name': u'old_container'}, 387 {u'count': 0, u'bytes': 0, u'name': u'container1'}, 388 {u'count': 0, u'bytes': 0, u'name': u'container2'}, 389 {u'count': 0, u'bytes': 0, u'name': u'container3'}, 390 {u'count': 3, u'bytes': 2306, u'name': u'test'}] 391 392 @rtype: list({"name":"...", "count":..., "bytes":...}) 393 @return: a list of all container info as dictionaries with the 394 keys "name", "count", and "bytes" 395 @param limit: number of results to return, up to 10,000 396 @type limit: int 397 @param marker: return only results whose name is greater than "marker" 398 @type marker: str 399 """ 400 if limit: 401 parms['limit'] = limit 402 if marker: 403 parms['marker'] = marker 404 parms['format'] = 'json' 405 response = self.make_request('GET', [''], parms=parms) 406 if (response.status < 200) or (response.status > 299): 407 buff = response.read() 408 raise ResponseError(response.status, response.reason) 409 return json_loads(response.read())
410
411 - def list_containers(self, limit=None, marker=None, **parms):
412 """ 413 Returns a list of Containers. 414 415 >>> connection.list_containers() 416 ['new_container', 417 'old_container', 418 'container1', 419 'container2', 420 'container3', 421 'test'] 422 423 @rtype: list(str) 424 @return: a list of all containers names as strings 425 @param limit: number of results to return, up to 10,000 426 @type limit: int 427 @param marker: return only results whose name is greater than "marker" 428 @type marker: str 429 """ 430 if limit: 431 parms['limit'] = limit 432 if marker: 433 parms['marker'] = marker 434 response = self.make_request('GET', [''], parms=parms) 435 if (response.status < 200) or (response.status > 299): 436 buff = response.read() 437 raise ResponseError(response.status, response.reason) 438 return response.read().splitlines()
439
440 - def __getitem__(self, key):
441 """ 442 Container objects can be grabbed from a connection using index 443 syntax. 444 445 >>> container = conn['old_container'] 446 >>> container.size_used 447 23074 448 449 @rtype: L{Container} 450 @return: an object representing the container 451 """ 452 return self.get_container(key)
453 454
455 -class ConnectionPool(Queue):
456 """ 457 A thread-safe connection pool object. 458 459 This component isn't required when using the cloudfiles library, but it may 460 be useful when building threaded applications. 461 """ 462
463 - def __init__(self, username=None, api_key=None, **kwargs):
464 poolsize = kwargs.pop('poolsize', 10) 465 self.connargs = {'username': username, 'api_key': api_key} 466 self.connargs.update(kwargs) 467 Queue.__init__(self, poolsize)
468
469 - def get(self):
470 """ 471 Return a cloudfiles connection object. 472 473 @rtype: L{Connection} 474 @return: a cloudfiles connection object 475 """ 476 try: 477 (create, connobj) = Queue.get(self, block=0) 478 except Empty: 479 connobj = Connection(**self.connargs) 480 return connobj
481
482 - def put(self, connobj):
483 """ 484 Place a cloudfiles connection object back into the pool. 485 486 @param connobj: a cloudfiles connection object 487 @type connobj: L{Connection} 488 """ 489 try: 490 Queue.put(self, (time(), connobj), block=0) 491 except Full: 492 del connobj
493 # vim:set ai sw=4 ts=4 tw=0 expandtab: 494