Package cloudfiles :: Module storage_object
[frames] | no frames]

Source Code for Module cloudfiles.storage_object

  1  """ 
  2  Object operations 
  3   
  4  An Object is analogous to a file on a conventional filesystem. You can 
  5  read data from, or write data to your Objects. You can also associate 
  6  arbitrary metadata with them. 
  7   
  8  See COPYING for license information. 
  9  """ 
 10   
 11  try: 
 12      from hashlib import md5 
 13  except ImportError: 
 14      from md5 import md5 
 15  import StringIO 
 16  import mimetypes 
 17  import os 
 18   
 19  from errors  import ResponseError, NoSuchObject, \ 
 20                      InvalidObjectName, IncompleteSend, \ 
 21                      InvalidMetaName, InvalidMetaValue 
 22   
 23  from socket  import timeout 
 24  import consts 
 25  from utils   import unicode_quote, requires_name 
26 27 # Because HTTPResponse objects *have* to have read() called on them 28 # before they can be used again ... 29 # pylint: disable-msg=W0612 30 31 32 -class Object(object):
33 """ 34 Storage data representing an object, (metadata and data). 35 36 @undocumented: _make_headers 37 @undocumented: _name_check 38 @undocumented: _initialize 39 @undocumented: compute_md5sum 40 @undocumented: __get_conn_for_write 41 @ivar name: the object's name (generally treat as read-only) 42 @type name: str 43 @ivar content_type: the object's content-type (set or read) 44 @type content_type: str 45 @ivar metadata: metadata associated with the object (set or read) 46 @type metadata: dict 47 @ivar size: the object's size (cached) 48 @type size: number 49 @ivar last_modified: date and time of last file modification (cached) 50 @type last_modified: str 51 @ivar container: the object's container (generally treat as read-only) 52 @type container: L{Container} 53 """ 54 # R/O support of the legacy objsum attr. 55 objsum = property(lambda self: self._etag) 56
57 - def __set_etag(self, value):
58 self._etag = value 59 self._etag_override = True
60 61 etag = property(lambda self: self._etag, __set_etag) 62
63 - def __init__(self, container, name=None, 64 force_exists=False, object_record=None):
65 """ 66 Storage objects rarely if ever need to be instantiated directly by the 67 user. 68 69 Instead, use the L{create_object<Container.create_object>}, 70 L{get_object<Container.get_object>}, 71 L{list_objects<Container.list_objects>} and other 72 methods on its parent L{Container} object. 73 """ 74 self.container = container 75 self.last_modified = None 76 self.metadata = {} 77 self.headers = {} 78 self.manifest = None 79 if object_record: 80 self.name = object_record['name'] 81 self.content_type = object_record['content_type'] 82 self.size = object_record['bytes'] 83 self.last_modified = object_record['last_modified'] 84 self._etag = object_record['hash'] 85 self._etag_override = False 86 else: 87 self.name = name 88 self.content_type = None 89 self.size = None 90 self._etag = None 91 self._etag_override = False 92 if not self._initialize() and force_exists: 93 raise NoSuchObject(self.name)
94 95 @requires_name(InvalidObjectName)
96 - def read(self, size=-1, offset=0, hdrs=None, buffer=None, callback=None):
97 """ 98 Read the content from the remote storage object. 99 100 By default this method will buffer the response in memory and 101 return it as a string. However, if a file-like object is passed 102 in using the buffer keyword, the response will be written to it 103 instead. 104 105 A callback can be passed in for reporting on the progress of 106 the download. The callback should accept two integers, the first 107 will be for the amount of data written so far, the second for 108 the total size of the transfer. Note: This option is only 109 applicable when used in conjunction with the buffer option. 110 111 >>> test_object.write('hello') 112 >>> test_object.read() 113 'hello' 114 115 @param size: combined with offset, defines the length of data to be 116 read 117 @type size: number 118 @param offset: combined with size, defines the start location to be 119 read 120 @type offset: number 121 @param hdrs: an optional dict of headers to send with the request 122 @type hdrs: dictionary 123 @param buffer: an optional file-like object to write the content to 124 @type buffer: file-like object 125 @param callback: function to be used as a progress callback 126 @type callback: callable(transferred, size) 127 @rtype: str or None 128 @return: a string of all data in the object, or None if a buffer is 129 used 130 """ 131 self._name_check() 132 if size > 0: 133 range = 'bytes=%d-%d' % (offset, (offset + size) - 1) 134 if hdrs: 135 hdrs['Range'] = range 136 else: 137 hdrs = {'Range': range} 138 response = self.container.conn.make_request('GET', 139 path=[self.container.name, self.name], hdrs=hdrs) 140 if (response.status < 200) or (response.status > 299): 141 response.read() 142 raise ResponseError(response.status, response.reason) 143 144 if hasattr(buffer, 'write'): 145 scratch = response.read(8192) 146 transferred = 0 147 148 while len(scratch) > 0: 149 buffer.write(scratch) 150 transferred += len(scratch) 151 if callable(callback): 152 callback(transferred, self.size) 153 scratch = response.read(8192) 154 return None 155 else: 156 return response.read()
157
158 - def save_to_filename(self, filename, callback=None):
159 """ 160 Save the contents of the object to filename. 161 162 >>> container = connection['container1'] 163 >>> obj = container.get_object('backup_file') 164 >>> obj.save_to_filename('./backup_file') 165 166 @param filename: name of the file 167 @type filename: str 168 @param callback: function to be used as a progress callback 169 @type callback: callable(transferred, size) 170 """ 171 fobj = open(filename, 'wb') 172 try: 173 self.read(buffer=fobj, callback=callback) 174 finally: 175 fobj.close()
176 177 @requires_name(InvalidObjectName)
178 - def stream(self, chunksize=8192, hdrs=None):
179 """ 180 Return a generator of the remote storage object's data. 181 182 Warning: The HTTP response is only complete after this generator 183 has raised a StopIteration. No other methods can be called until 184 this has occurred. 185 186 >>> test_object.write('hello') 187 >>> test_object.stream() 188 <generator object at 0xb77939cc> 189 >>> '-'.join(test_object.stream(chunksize=1)) 190 'h-e-l-l-o' 191 192 @param chunksize: size in bytes yielded by the generator 193 @type chunksize: number 194 @param hdrs: an optional dict of headers to send in the request 195 @type hdrs: dict 196 @rtype: str generator 197 @return: a generator which yields strings as the object is downloaded 198 """ 199 self._name_check() 200 response = self.container.conn.make_request('GET', 201 path=[self.container.name, self.name], hdrs=hdrs) 202 if response.status < 200 or response.status > 299: 203 buff = response.read() 204 raise ResponseError(response.status, response.reason) 205 buff = response.read(chunksize) 206 while len(buff) > 0: 207 yield buff 208 buff = response.read(chunksize) 209 # I hate you httplib 210 buff = response.read()
211 212 @requires_name(InvalidObjectName)
213 - def sync_metadata(self):
214 """ 215 Commits the metadata and custom headers to the remote storage system. 216 217 >>> test_object = container['paradise_lost.pdf'] 218 >>> test_object.metadata = {'author': 'John Milton'} 219 >>> test_object.headers = {'content-disposition': 'foo'} 220 >>> test_objectt.sync_metadata() 221 222 Object metadata can be set and retrieved through the object's 223 .metadata attribute. 224 """ 225 self._name_check() 226 if self.metadata or self.headers: 227 headers = self._make_headers() 228 headers['Content-Length'] = "0" 229 response = self.container.conn.make_request( 230 'POST', [self.container.name, self.name], hdrs=headers, 231 data='') 232 response.read() 233 if response.status != 202: 234 raise ResponseError(response.status, response.reason)
235 236 @requires_name(InvalidObjectName)
237 - def sync_manifest(self):
238 """ 239 Commits the manifest to the remote storage system. 240 241 >>> test_object = container['paradise_lost.pdf'] 242 >>> test_object.manifest = 'container/prefix' 243 >>> test_object.sync_manifest() 244 245 Object manifests can be set and retrieved through the object's 246 .manifest attribute. 247 """ 248 self._name_check() 249 if self.manifest: 250 headers = self._make_headers() 251 headers['Content-Length'] = "0" 252 response = self.container.conn.make_request( 253 'PUT', [self.container.name, self.name], hdrs=headers, 254 data='') 255 response.read() 256 if response.status < 200 or response.status > 299: 257 raise ResponseError(response.status, response.reason)
258
259 - def __get_conn_for_write(self):
260 headers = self._make_headers() 261 262 headers['X-Auth-Token'] = self.container.conn.token 263 264 path = "/%s/%s/%s" % (self.container.conn.uri.rstrip('/'), \ 265 unicode_quote(self.container.name), unicode_quote(self.name)) 266 267 # Requests are handled a little differently for writes ... 268 http = self.container.conn.connection 269 270 # TODO: more/better exception handling please 271 http.putrequest('PUT', path) 272 for hdr in headers: 273 http.putheader(hdr, headers[hdr]) 274 http.putheader('User-Agent', self.container.conn.user_agent) 275 http.endheaders() 276 return http
277 278 # pylint: disable-msg=W0622 279 @requires_name(InvalidObjectName)
280 - def write(self, data='', verify=True, callback=None):
281 """ 282 Write data to the remote storage system. 283 284 By default, server-side verification is enabled, (verify=True), and 285 end-to-end verification is performed using an md5 checksum. When 286 verification is disabled, (verify=False), the etag attribute will 287 be set to the value returned by the server, not one calculated 288 locally. When disabling verification, there is no guarantee that 289 what you think was uploaded matches what was actually stored. Use 290 this optional carefully. You have been warned. 291 292 A callback can be passed in for reporting on the progress of 293 the upload. The callback should accept two integers, the first 294 will be for the amount of data written so far, the second for 295 the total size of the transfer. 296 297 >>> test_object = container.create_object('file.txt') 298 >>> test_object.content_type = 'text/plain' 299 >>> fp = open('./file.txt') 300 >>> test_object.write(fp) 301 302 @param data: the data to be written 303 @type data: str or file 304 @param verify: enable/disable server-side checksum verification 305 @type verify: boolean 306 @param callback: function to be used as a progress callback 307 @type callback: callable(transferred, size) 308 """ 309 self._name_check() 310 if isinstance(data, file): 311 # pylint: disable-msg=E1101 312 try: 313 data.flush() 314 except IOError: 315 pass # If the file descriptor is read-only this will fail 316 self.size = int(os.fstat(data.fileno())[6]) 317 elif isinstance(data, basestring): 318 data = StringIO.StringIO(data) 319 self.size = data.len 320 elif isinstance(data, StringIO.StringIO): 321 self.size = data.len 322 else: 323 self.size = len(data) 324 325 # If override is set (and _etag is not None), then the etag has 326 # been manually assigned and we will not calculate our own. 327 328 if not self._etag_override: 329 self._etag = None 330 331 if not self.content_type: 332 # pylint: disable-msg=E1101 333 type = None 334 if hasattr(data, 'name'): 335 type = mimetypes.guess_type(data.name)[0] 336 self.content_type = type and type or 'application/octet-stream' 337 338 http = self.__get_conn_for_write() 339 340 response = None 341 transfered = 0 342 running_checksum = md5() 343 344 buff = data.read(4096) 345 try: 346 while len(buff) > 0: 347 http.send(buff) 348 if verify and not self._etag_override: 349 running_checksum.update(buff) 350 buff = data.read(4096) 351 transfered += len(buff) 352 if callable(callback): 353 callback(transfered, self.size) 354 response = http.getresponse() 355 buff = response.read() 356 except timeout, err: 357 if response: 358 # pylint: disable-msg=E1101 359 buff = response.read() 360 raise err 361 else: 362 if verify and not self._etag_override: 363 self._etag = running_checksum.hexdigest() 364 365 # ---------------------------------------------------------------- 366 367 if (response.status < 200) or (response.status > 299): 368 raise ResponseError(response.status, response.reason) 369 370 # If verification has been disabled for this write, then set the 371 # instances etag attribute to what the server returns to us. 372 if not verify: 373 for hdr in response.getheaders(): 374 if hdr[0].lower() == 'etag': 375 self._etag = hdr[1]
376 377 @requires_name(InvalidObjectName)
378 - def copy_to(self, container_name, name):
379 """ 380 Copy an object's contents to another location. 381 """ 382 383 self._name_check() 384 self._name_check(name) 385 386 # This method implicitly disables verification. 387 if not self._etag_override: 388 self._etag = None 389 390 headers = self._make_headers() 391 headers['Destination'] = unicode_quote("%s/%s" % (container_name, name)) 392 headers['Content-Length'] = 0 393 response = self.container.conn.make_request( 394 'COPY', [self.container.name, self.name], hdrs=headers, data='') 395 buff = response.read() 396 397 if response.status < 200 or response.status > 299: 398 raise ResponseError(response.status, response.reason) 399 400 # Reset the etag to what the server returns. 401 for hdr in response.getheaders(): 402 if hdr[0].lower() == 'etag': 403 self._etag = hdr[1]
404 405 @requires_name(InvalidObjectName)
406 - def copy_from(self, container_name, name):
407 """ 408 Copy another object's contents to this object. 409 """ 410 411 self._name_check() 412 self._name_check(name) 413 414 # This method implicitly disables verification. 415 if not self._etag_override: 416 self._etag = None 417 418 headers = self._make_headers() 419 headers['X-Copy-From'] = unicode_quote("%s/%s" % (container_name, name)) 420 headers['Content-Length'] = 0 421 response = self.container.conn.make_request( 422 'PUT', [self.container.name, self.name], hdrs=headers, data='') 423 buff = response.read() 424 425 if response.status < 200 or response.status > 299: 426 raise ResponseError(response.status, response.reason) 427 428 # Reset the etag to what the server returns. 429 for hdr in response.getheaders(): 430 if hdr[0].lower() == 'etag': 431 self._etag = hdr[1]
432 433 @requires_name(InvalidObjectName)
434 - def send(self, iterable):
435 """ 436 Write potentially transient data to the remote storage system using a 437 generator or stream. 438 439 If the object's size is not set, chunked transfer encoding will be 440 used to upload the file. 441 442 If the object's size attribute is set, it will be used as the 443 Content-Length. If the generator raises StopIteration prior to 444 yielding the right number of bytes, an IncompleteSend exception is 445 raised. 446 447 If the content_type attribute is not set then a value of 448 application/octet-stream will be used. 449 450 Server-side verification will be performed if an md5 checksum is 451 assigned to the etag property before calling this method, 452 otherwise no verification will be performed, (verification 453 can be performed afterward though by using the etag attribute 454 which is set to the value returned by the server). 455 456 >>> test_object = container.create_object('backup.tar.gz') 457 >>> pfd = os.popen('tar -czvf - ./data/', 'r') 458 >>> test_object.send(pfd) 459 460 @param iterable: stream or generator which yields the content to upload 461 @type iterable: generator or stream 462 """ 463 self._name_check() 464 465 if isinstance(iterable, basestring): 466 # use write to buffer the string and avoid sending it 1 byte at a time 467 self.write(iterable) 468 469 if hasattr(iterable, 'read'): 470 471 def file_iterator(file): 472 chunk = file.read(4095) 473 while chunk: 474 yield chunk 475 chunk = file.read(4095) 476 raise StopIteration()
477 iterable = file_iterator(iterable) 478 479 # This method implicitly disables verification. 480 if not self._etag_override: 481 self._etag = None 482 483 if not self.content_type: 484 self.content_type = 'application/octet-stream' 485 486 path = "/%s/%s/%s" % (self.container.conn.uri.rstrip('/'), \ 487 unicode_quote(self.container.name), unicode_quote(self.name)) 488 headers = self._make_headers() 489 if self.size is None: 490 del headers['Content-Length'] 491 headers['Transfer-Encoding'] = 'chunked' 492 headers['X-Auth-Token'] = self.container.conn.token 493 headers['User-Agent'] = self.container.conn.user_agent 494 http = self.container.conn.connection 495 http.putrequest('PUT', path) 496 for key, value in headers.iteritems(): 497 http.putheader(key, value) 498 http.endheaders() 499 500 response = None 501 transferred = 0 502 try: 503 for chunk in iterable: 504 if self.size is None: 505 http.send("%X\r\n" % len(chunk)) 506 http.send(chunk) 507 http.send("\r\n") 508 else: 509 http.send(chunk) 510 transferred += len(chunk) 511 if self.size is None: 512 http.send("0\r\n\r\n") 513 # If the generator didn't yield enough data, stop, drop, and roll. 514 elif transferred < self.size: 515 raise IncompleteSend() 516 response = http.getresponse() 517 buff = response.read() 518 except timeout, err: 519 if response: 520 # pylint: disable-msg=E1101 521 response.read() 522 raise err 523 524 if (response.status < 200) or (response.status > 299): 525 raise ResponseError(response.status, response.reason) 526 527 for hdr in response.getheaders(): 528 if hdr[0].lower() == 'etag': 529 self._etag = hdr[1]
530
531 - def load_from_filename(self, filename, verify=True, callback=None):
532 """ 533 Put the contents of the named file into remote storage. 534 535 >>> test_object = container.create_object('file.txt') 536 >>> test_object.content_type = 'text/plain' 537 >>> test_object.load_from_filename('./my_file.txt') 538 539 @param filename: path to the file 540 @type filename: str 541 @param verify: enable/disable server-side checksum verification 542 @type verify: boolean 543 @param callback: function to be used as a progress callback 544 @type callback: callable(transferred, size) 545 """ 546 fobj = open(filename, 'rb') 547 self.write(fobj, verify=verify, callback=callback) 548 fobj.close()
549
550 - def _initialize(self):
551 """ 552 Initialize the Object with values from the remote service (if any). 553 """ 554 if not self.name: 555 return False 556 557 response = self.container.conn.make_request( 558 'HEAD', [self.container.name, self.name]) 559 response.read() 560 if response.status == 404: 561 return False 562 if (response.status < 200) or (response.status > 299): 563 raise ResponseError(response.status, response.reason) 564 for hdr in response.getheaders(): 565 if hdr[0].lower() == 'x-object-manifest': 566 self.manifest = hdr[1] 567 if hdr[0].lower() == 'content-type': 568 self.content_type = hdr[1] 569 if hdr[0].lower().startswith('x-object-meta-'): 570 self.metadata[hdr[0][14:]] = hdr[1] 571 if hdr[0].lower() == 'etag': 572 self._etag = hdr[1] 573 self._etag_override = False 574 if hdr[0].lower() == 'content-length': 575 self.size = int(hdr[1]) 576 if hdr[0].lower() == 'last-modified': 577 self.last_modified = hdr[1] 578 return True
579
580 - def __str__(self):
581 return self.name
582
583 - def _name_check(self, name=None):
584 if name is None: 585 name = self.name 586 if len(name) > consts.object_name_limit: 587 raise InvalidObjectName(name)
588
589 - def _make_headers(self):
590 """ 591 Returns a dictionary representing http headers based on the 592 respective instance attributes. 593 """ 594 headers = {} 595 headers['Content-Length'] = (str(self.size) \ 596 and str(self.size) != "0") \ 597 and str(self.size) or "0" 598 if self.manifest: 599 headers['X-Object-Manifest'] = self.manifest 600 if self._etag: 601 headers['ETag'] = self._etag 602 603 if self.content_type: 604 headers['Content-Type'] = self.content_type 605 else: 606 headers['Content-Type'] = 'application/octet-stream' 607 for key in self.metadata: 608 if len(key) > consts.meta_name_limit: 609 raise(InvalidMetaName(key)) 610 if len(self.metadata[key]) > consts.meta_value_limit: 611 raise(InvalidMetaValue(self.metadata[key])) 612 headers['X-Object-Meta-' + key] = self.metadata[key] 613 headers.update(self.headers) 614 return headers
615 616 @classmethod
617 - def compute_md5sum(cls, fobj):
618 """ 619 Given an open file object, returns the md5 hexdigest of the data. 620 """ 621 checksum = md5() 622 buff = fobj.read(4096) 623 while buff: 624 checksum.update(buff) 625 buff = fobj.read(4096) 626 fobj.seek(0) 627 return checksum.hexdigest()
628
629 - def public_uri(self):
630 """ 631 Retrieve the URI for this object, if its container is public. 632 633 >>> container1 = connection['container1'] 634 >>> container1.make_public() 635 >>> container1.create_object('file.txt').write('testing') 636 >>> container1['file.txt'].public_uri() 637 'http://c00061.cdn.cloudfiles.rackspacecloud.com/file.txt' 638 639 @return: the public URI for this object 640 @rtype: str 641 """ 642 return "%s/%s" % (self.container.public_uri().rstrip('/'), 643 unicode_quote(self.name))
644
645 - def public_ssl_uri(self):
646 """ 647 Retrieve the SSL URI for this object, if its container is public. 648 649 >>> container1 = connection['container1'] 650 >>> container1.make_public() 651 >>> container1.create_object('file.txt').write('testing') 652 >>> container1['file.txt'].public_ssl_uri() 653 'https://c61.ssl.cf0.rackcdn.com/file.txt' 654 655 @return: the public SSL URI for this object 656 @rtype: str 657 """ 658 return "%s/%s" % (self.container.public_ssl_uri().rstrip('/'), 659 unicode_quote(self.name))
660
661 - def public_streaming_uri(self):
662 """ 663 Retrieve the streaming URI for this object, if its container is public. 664 665 >>> container1 = connection['container1'] 666 >>> container1.make_public() 667 >>> container1.create_object('file.txt').write('testing') 668 >>> container1['file.txt'].public_streaming_uri() 669 'https://c61.stream.rackcdn.com/file.txt' 670 671 @return: the public Streaming URI for this object 672 @rtype: str 673 """ 674 return "%s/%s" % (self.container.public_streaming_uri().rstrip('/'), 675 unicode_quote(self.name))
676
677 - def purge_from_cdn(self, email=None):
678 """ 679 Purge Edge cache for this object. 680 You will be notified by email if one is provided when the 681 job completes. 682 683 >>> obj.purge_from_cdn("user@dmain.com") 684 685 or 686 687 >>> obj.purge_from_cdn("user@domain.com,user2@domain.com") 688 689 or 690 691 >>> obj.purge_from_cdn() 692 693 @param email: A Valid email address 694 @type email: str 695 """ 696 if not self.container.conn.cdn_enabled: 697 raise CDNNotEnabled() 698 699 if email: 700 hdrs = {"X-Purge-Email": email} 701 response = self.container.conn.cdn_request('DELETE', 702 [self.container.name, self.name], hdrs=hdrs) 703 else: 704 response = self.container.conn.cdn_request('DELETE', 705 [self.container.name, self.name]) 706 707 if (response.status < 200) or (response.status >= 299): 708 raise ResponseError(response.status, response.reason)
709
710 711 -class ObjectResults(object):
712 """ 713 An iterable results set object for Objects. 714 715 This class implements dictionary- and list-like interfaces. 716 """
717 - def __init__(self, container, objects=None):
718 if objects is None: 719 objects = [] 720 self._names = [] 721 self._objects = [] 722 for obj in objects: 723 try: 724 self._names.append(obj['name']) 725 except KeyError: 726 # pseudo-objects from a delimiter query don't have names 727 continue 728 else: 729 self._objects.append(obj) 730 self.container = container
731
732 - def __getitem__(self, key):
733 return Object(self.container, object_record=self._objects[key])
734
735 - def __getslice__(self, i, j):
736 return [Object(self.container, object_record=k) \ 737 for k in self._objects[i:j]]
738
739 - def __contains__(self, item):
740 return item in self._objects
741
742 - def __len__(self):
743 return len(self._objects)
744
745 - def __repr__(self):
746 return 'ObjectResults: %s objects' % len(self._objects)
747 __str__ = __repr__ 748
749 - def index(self, value, *args):
750 """ 751 returns an integer for the first index of value 752 """ 753 return self._names.index(value, *args)
754
755 - def count(self, value):
756 """ 757 returns the number of occurrences of value 758 """ 759 return self._names.count(value)
760 761 # vim:set ai sw=4 ts=4 tw=0 expandtab: 762