1 """
2 Object operations
3
4 An Object is analogous to a file on a conventional filesystem. You can
5 read data from, or write data to your Objects. You can also associate
6 arbitrary metadata with them.
7
8 See COPYING for license information.
9 """
10
11 try:
12 from hashlib import md5
13 except ImportError:
14 from md5 import md5
15 import StringIO
16 import mimetypes
17 import os
18
19 from errors import ResponseError, NoSuchObject, \
20 InvalidObjectName, IncompleteSend, \
21 InvalidMetaName, InvalidMetaValue
22
23 from socket import timeout
24 import consts
25 from utils import unicode_quote, requires_name
26
27
28
29
30
31
32 -class Object(object):
33 """
34 Storage data representing an object, (metadata and data).
35
36 @undocumented: _make_headers
37 @undocumented: _name_check
38 @undocumented: _initialize
39 @undocumented: compute_md5sum
40 @undocumented: __get_conn_for_write
41 @ivar name: the object's name (generally treat as read-only)
42 @type name: str
43 @ivar content_type: the object's content-type (set or read)
44 @type content_type: str
45 @ivar metadata: metadata associated with the object (set or read)
46 @type metadata: dict
47 @ivar size: the object's size (cached)
48 @type size: number
49 @ivar last_modified: date and time of last file modification (cached)
50 @type last_modified: str
51 @ivar container: the object's container (generally treat as read-only)
52 @type container: L{Container}
53 """
54
55 objsum = property(lambda self: self._etag)
56
58 self._etag = value
59 self._etag_override = True
60
61 etag = property(lambda self: self._etag, __set_etag)
62
63 - def __init__(self, container, name=None,
64 force_exists=False, object_record=None):
65 """
66 Storage objects rarely if ever need to be instantiated directly by the
67 user.
68
69 Instead, use the L{create_object<Container.create_object>},
70 L{get_object<Container.get_object>},
71 L{list_objects<Container.list_objects>} and other
72 methods on its parent L{Container} object.
73 """
74 self.container = container
75 self.last_modified = None
76 self.metadata = {}
77 self.headers = {}
78 self.manifest = None
79 if object_record:
80 self.name = object_record['name']
81 self.content_type = object_record['content_type']
82 self.size = object_record['bytes']
83 self.last_modified = object_record['last_modified']
84 self._etag = object_record['hash']
85 self._etag_override = False
86 else:
87 self.name = name
88 self.content_type = None
89 self.size = None
90 self._etag = None
91 self._etag_override = False
92 if not self._initialize() and force_exists:
93 raise NoSuchObject(self.name)
94
95 @requires_name(InvalidObjectName)
96 - def read(self, size=-1, offset=0, hdrs=None, buffer=None, callback=None):
97 """
98 Read the content from the remote storage object.
99
100 By default this method will buffer the response in memory and
101 return it as a string. However, if a file-like object is passed
102 in using the buffer keyword, the response will be written to it
103 instead.
104
105 A callback can be passed in for reporting on the progress of
106 the download. The callback should accept two integers, the first
107 will be for the amount of data written so far, the second for
108 the total size of the transfer. Note: This option is only
109 applicable when used in conjunction with the buffer option.
110
111 >>> test_object.write('hello')
112 >>> test_object.read()
113 'hello'
114
115 @param size: combined with offset, defines the length of data to be
116 read
117 @type size: number
118 @param offset: combined with size, defines the start location to be
119 read
120 @type offset: number
121 @param hdrs: an optional dict of headers to send with the request
122 @type hdrs: dictionary
123 @param buffer: an optional file-like object to write the content to
124 @type buffer: file-like object
125 @param callback: function to be used as a progress callback
126 @type callback: callable(transferred, size)
127 @rtype: str or None
128 @return: a string of all data in the object, or None if a buffer is
129 used
130 """
131 self._name_check()
132 if size > 0:
133 range = 'bytes=%d-%d' % (offset, (offset + size) - 1)
134 if hdrs:
135 hdrs['Range'] = range
136 else:
137 hdrs = {'Range': range}
138 response = self.container.conn.make_request('GET',
139 path=[self.container.name, self.name], hdrs=hdrs)
140 if (response.status < 200) or (response.status > 299):
141 response.read()
142 raise ResponseError(response.status, response.reason)
143
144 if hasattr(buffer, 'write'):
145 scratch = response.read(8192)
146 transferred = 0
147
148 while len(scratch) > 0:
149 buffer.write(scratch)
150 transferred += len(scratch)
151 if callable(callback):
152 callback(transferred, self.size)
153 scratch = response.read(8192)
154 return None
155 else:
156 return response.read()
157
159 """
160 Save the contents of the object to filename.
161
162 >>> container = connection['container1']
163 >>> obj = container.get_object('backup_file')
164 >>> obj.save_to_filename('./backup_file')
165
166 @param filename: name of the file
167 @type filename: str
168 @param callback: function to be used as a progress callback
169 @type callback: callable(transferred, size)
170 """
171 fobj = open(filename, 'wb')
172 try:
173 self.read(buffer=fobj, callback=callback)
174 finally:
175 fobj.close()
176
177 @requires_name(InvalidObjectName)
178 - def stream(self, chunksize=8192, hdrs=None):
179 """
180 Return a generator of the remote storage object's data.
181
182 Warning: The HTTP response is only complete after this generator
183 has raised a StopIteration. No other methods can be called until
184 this has occurred.
185
186 >>> test_object.write('hello')
187 >>> test_object.stream()
188 <generator object at 0xb77939cc>
189 >>> '-'.join(test_object.stream(chunksize=1))
190 'h-e-l-l-o'
191
192 @param chunksize: size in bytes yielded by the generator
193 @type chunksize: number
194 @param hdrs: an optional dict of headers to send in the request
195 @type hdrs: dict
196 @rtype: str generator
197 @return: a generator which yields strings as the object is downloaded
198 """
199 self._name_check()
200 response = self.container.conn.make_request('GET',
201 path=[self.container.name, self.name], hdrs=hdrs)
202 if response.status < 200 or response.status > 299:
203 buff = response.read()
204 raise ResponseError(response.status, response.reason)
205 buff = response.read(chunksize)
206 while len(buff) > 0:
207 yield buff
208 buff = response.read(chunksize)
209
210 buff = response.read()
211
212 @requires_name(InvalidObjectName)
235
236 @requires_name(InvalidObjectName)
238 """
239 Commits the manifest to the remote storage system.
240
241 >>> test_object = container['paradise_lost.pdf']
242 >>> test_object.manifest = 'container/prefix'
243 >>> test_object.sync_manifest()
244
245 Object manifests can be set and retrieved through the object's
246 .manifest attribute.
247 """
248 self._name_check()
249 if self.manifest:
250 headers = self._make_headers()
251 headers['Content-Length'] = "0"
252 response = self.container.conn.make_request(
253 'PUT', [self.container.name, self.name], hdrs=headers,
254 data='')
255 response.read()
256 if response.status < 200 or response.status > 299:
257 raise ResponseError(response.status, response.reason)
258
260 headers = self._make_headers()
261
262 headers['X-Auth-Token'] = self.container.conn.token
263
264 path = "/%s/%s/%s" % (self.container.conn.uri.rstrip('/'), \
265 unicode_quote(self.container.name), unicode_quote(self.name))
266
267
268 http = self.container.conn.connection
269
270
271 http.putrequest('PUT', path)
272 for hdr in headers:
273 http.putheader(hdr, headers[hdr])
274 http.putheader('User-Agent', self.container.conn.user_agent)
275 http.endheaders()
276 return http
277
278
279 @requires_name(InvalidObjectName)
280 - def write(self, data='', verify=True, callback=None):
281 """
282 Write data to the remote storage system.
283
284 By default, server-side verification is enabled, (verify=True), and
285 end-to-end verification is performed using an md5 checksum. When
286 verification is disabled, (verify=False), the etag attribute will
287 be set to the value returned by the server, not one calculated
288 locally. When disabling verification, there is no guarantee that
289 what you think was uploaded matches what was actually stored. Use
290 this optional carefully. You have been warned.
291
292 A callback can be passed in for reporting on the progress of
293 the upload. The callback should accept two integers, the first
294 will be for the amount of data written so far, the second for
295 the total size of the transfer.
296
297 >>> test_object = container.create_object('file.txt')
298 >>> test_object.content_type = 'text/plain'
299 >>> fp = open('./file.txt')
300 >>> test_object.write(fp)
301
302 @param data: the data to be written
303 @type data: str or file
304 @param verify: enable/disable server-side checksum verification
305 @type verify: boolean
306 @param callback: function to be used as a progress callback
307 @type callback: callable(transferred, size)
308 """
309 self._name_check()
310 if isinstance(data, file):
311
312 try:
313 data.flush()
314 except IOError:
315 pass
316 self.size = int(os.fstat(data.fileno())[6])
317 elif isinstance(data, basestring):
318 data = StringIO.StringIO(data)
319 self.size = data.len
320 elif isinstance(data, StringIO.StringIO):
321 self.size = data.len
322 else:
323 self.size = len(data)
324
325
326
327
328 if not self._etag_override:
329 self._etag = None
330
331 if not self.content_type:
332
333 type = None
334 if hasattr(data, 'name'):
335 type = mimetypes.guess_type(data.name)[0]
336 self.content_type = type and type or 'application/octet-stream'
337
338 http = self.__get_conn_for_write()
339
340 response = None
341 transfered = 0
342 running_checksum = md5()
343
344 buff = data.read(4096)
345 try:
346 while len(buff) > 0:
347 http.send(buff)
348 if verify and not self._etag_override:
349 running_checksum.update(buff)
350 buff = data.read(4096)
351 transfered += len(buff)
352 if callable(callback):
353 callback(transfered, self.size)
354 response = http.getresponse()
355 buff = response.read()
356 except timeout, err:
357 if response:
358
359 buff = response.read()
360 raise err
361 else:
362 if verify and not self._etag_override:
363 self._etag = running_checksum.hexdigest()
364
365
366
367 if (response.status < 200) or (response.status > 299):
368 raise ResponseError(response.status, response.reason)
369
370
371
372 if not verify:
373 for hdr in response.getheaders():
374 if hdr[0].lower() == 'etag':
375 self._etag = hdr[1]
376
377 @requires_name(InvalidObjectName)
378 - def copy_to(self, container_name, name):
379 """
380 Copy an object's contents to another location.
381 """
382
383 self._name_check()
384 self._name_check(name)
385
386
387 if not self._etag_override:
388 self._etag = None
389
390 headers = self._make_headers()
391 headers['Destination'] = unicode_quote("%s/%s" % (container_name, name))
392 headers['Content-Length'] = 0
393 response = self.container.conn.make_request(
394 'COPY', [self.container.name, self.name], hdrs=headers, data='')
395 buff = response.read()
396
397 if response.status < 200 or response.status > 299:
398 raise ResponseError(response.status, response.reason)
399
400
401 for hdr in response.getheaders():
402 if hdr[0].lower() == 'etag':
403 self._etag = hdr[1]
404
405 @requires_name(InvalidObjectName)
407 """
408 Copy another object's contents to this object.
409 """
410
411 self._name_check()
412 self._name_check(name)
413
414
415 if not self._etag_override:
416 self._etag = None
417
418 headers = self._make_headers()
419 headers['X-Copy-From'] = unicode_quote("%s/%s" % (container_name, name))
420 headers['Content-Length'] = 0
421 response = self.container.conn.make_request(
422 'PUT', [self.container.name, self.name], hdrs=headers, data='')
423 buff = response.read()
424
425 if response.status < 200 or response.status > 299:
426 raise ResponseError(response.status, response.reason)
427
428
429 for hdr in response.getheaders():
430 if hdr[0].lower() == 'etag':
431 self._etag = hdr[1]
432
433 @requires_name(InvalidObjectName)
434 - def send(self, iterable):
435 """
436 Write potentially transient data to the remote storage system using a
437 generator or stream.
438
439 If the object's size is not set, chunked transfer encoding will be
440 used to upload the file.
441
442 If the object's size attribute is set, it will be used as the
443 Content-Length. If the generator raises StopIteration prior to
444 yielding the right number of bytes, an IncompleteSend exception is
445 raised.
446
447 If the content_type attribute is not set then a value of
448 application/octet-stream will be used.
449
450 Server-side verification will be performed if an md5 checksum is
451 assigned to the etag property before calling this method,
452 otherwise no verification will be performed, (verification
453 can be performed afterward though by using the etag attribute
454 which is set to the value returned by the server).
455
456 >>> test_object = container.create_object('backup.tar.gz')
457 >>> pfd = os.popen('tar -czvf - ./data/', 'r')
458 >>> test_object.send(pfd)
459
460 @param iterable: stream or generator which yields the content to upload
461 @type iterable: generator or stream
462 """
463 self._name_check()
464
465 if isinstance(iterable, basestring):
466
467 self.write(iterable)
468
469 if hasattr(iterable, 'read'):
470
471 def file_iterator(file):
472 chunk = file.read(4095)
473 while chunk:
474 yield chunk
475 chunk = file.read(4095)
476 raise StopIteration()
477 iterable = file_iterator(iterable)
478
479
480 if not self._etag_override:
481 self._etag = None
482
483 if not self.content_type:
484 self.content_type = 'application/octet-stream'
485
486 path = "/%s/%s/%s" % (self.container.conn.uri.rstrip('/'), \
487 unicode_quote(self.container.name), unicode_quote(self.name))
488 headers = self._make_headers()
489 if self.size is None:
490 del headers['Content-Length']
491 headers['Transfer-Encoding'] = 'chunked'
492 headers['X-Auth-Token'] = self.container.conn.token
493 headers['User-Agent'] = self.container.conn.user_agent
494 http = self.container.conn.connection
495 http.putrequest('PUT', path)
496 for key, value in headers.iteritems():
497 http.putheader(key, value)
498 http.endheaders()
499
500 response = None
501 transferred = 0
502 try:
503 for chunk in iterable:
504 if self.size is None:
505 http.send("%X\r\n" % len(chunk))
506 http.send(chunk)
507 http.send("\r\n")
508 else:
509 http.send(chunk)
510 transferred += len(chunk)
511 if self.size is None:
512 http.send("0\r\n\r\n")
513
514 elif transferred < self.size:
515 raise IncompleteSend()
516 response = http.getresponse()
517 buff = response.read()
518 except timeout, err:
519 if response:
520
521 response.read()
522 raise err
523
524 if (response.status < 200) or (response.status > 299):
525 raise ResponseError(response.status, response.reason)
526
527 for hdr in response.getheaders():
528 if hdr[0].lower() == 'etag':
529 self._etag = hdr[1]
530
532 """
533 Put the contents of the named file into remote storage.
534
535 >>> test_object = container.create_object('file.txt')
536 >>> test_object.content_type = 'text/plain'
537 >>> test_object.load_from_filename('./my_file.txt')
538
539 @param filename: path to the file
540 @type filename: str
541 @param verify: enable/disable server-side checksum verification
542 @type verify: boolean
543 @param callback: function to be used as a progress callback
544 @type callback: callable(transferred, size)
545 """
546 fobj = open(filename, 'rb')
547 self.write(fobj, verify=verify, callback=callback)
548 fobj.close()
549
551 """
552 Initialize the Object with values from the remote service (if any).
553 """
554 if not self.name:
555 return False
556
557 response = self.container.conn.make_request(
558 'HEAD', [self.container.name, self.name])
559 response.read()
560 if response.status == 404:
561 return False
562 if (response.status < 200) or (response.status > 299):
563 raise ResponseError(response.status, response.reason)
564 for hdr in response.getheaders():
565 if hdr[0].lower() == 'x-object-manifest':
566 self.manifest = hdr[1]
567 if hdr[0].lower() == 'content-type':
568 self.content_type = hdr[1]
569 if hdr[0].lower().startswith('x-object-meta-'):
570 self.metadata[hdr[0][14:]] = hdr[1]
571 if hdr[0].lower() == 'etag':
572 self._etag = hdr[1]
573 self._etag_override = False
574 if hdr[0].lower() == 'content-length':
575 self.size = int(hdr[1])
576 if hdr[0].lower() == 'last-modified':
577 self.last_modified = hdr[1]
578 return True
579
582
588
590 """
591 Returns a dictionary representing http headers based on the
592 respective instance attributes.
593 """
594 headers = {}
595 headers['Content-Length'] = (str(self.size) \
596 and str(self.size) != "0") \
597 and str(self.size) or "0"
598 if self.manifest:
599 headers['X-Object-Manifest'] = self.manifest
600 if self._etag:
601 headers['ETag'] = self._etag
602
603 if self.content_type:
604 headers['Content-Type'] = self.content_type
605 else:
606 headers['Content-Type'] = 'application/octet-stream'
607 for key in self.metadata:
608 if len(key) > consts.meta_name_limit:
609 raise(InvalidMetaName(key))
610 if len(self.metadata[key]) > consts.meta_value_limit:
611 raise(InvalidMetaValue(self.metadata[key]))
612 headers['X-Object-Meta-' + key] = self.metadata[key]
613 headers.update(self.headers)
614 return headers
615
616 @classmethod
618 """
619 Given an open file object, returns the md5 hexdigest of the data.
620 """
621 checksum = md5()
622 buff = fobj.read(4096)
623 while buff:
624 checksum.update(buff)
625 buff = fobj.read(4096)
626 fobj.seek(0)
627 return checksum.hexdigest()
628
630 """
631 Retrieve the URI for this object, if its container is public.
632
633 >>> container1 = connection['container1']
634 >>> container1.make_public()
635 >>> container1.create_object('file.txt').write('testing')
636 >>> container1['file.txt'].public_uri()
637 'http://c00061.cdn.cloudfiles.rackspacecloud.com/file.txt'
638
639 @return: the public URI for this object
640 @rtype: str
641 """
642 return "%s/%s" % (self.container.public_uri().rstrip('/'),
643 unicode_quote(self.name))
644
646 """
647 Retrieve the SSL URI for this object, if its container is public.
648
649 >>> container1 = connection['container1']
650 >>> container1.make_public()
651 >>> container1.create_object('file.txt').write('testing')
652 >>> container1['file.txt'].public_ssl_uri()
653 'https://c61.ssl.cf0.rackcdn.com/file.txt'
654
655 @return: the public SSL URI for this object
656 @rtype: str
657 """
658 return "%s/%s" % (self.container.public_ssl_uri().rstrip('/'),
659 unicode_quote(self.name))
660
662 """
663 Retrieve the streaming URI for this object, if its container is public.
664
665 >>> container1 = connection['container1']
666 >>> container1.make_public()
667 >>> container1.create_object('file.txt').write('testing')
668 >>> container1['file.txt'].public_streaming_uri()
669 'https://c61.stream.rackcdn.com/file.txt'
670
671 @return: the public Streaming URI for this object
672 @rtype: str
673 """
674 return "%s/%s" % (self.container.public_streaming_uri().rstrip('/'),
675 unicode_quote(self.name))
676
678 """
679 Purge Edge cache for this object.
680 You will be notified by email if one is provided when the
681 job completes.
682
683 >>> obj.purge_from_cdn("user@dmain.com")
684
685 or
686
687 >>> obj.purge_from_cdn("user@domain.com,user2@domain.com")
688
689 or
690
691 >>> obj.purge_from_cdn()
692
693 @param email: A Valid email address
694 @type email: str
695 """
696 if not self.container.conn.cdn_enabled:
697 raise CDNNotEnabled()
698
699 if email:
700 hdrs = {"X-Purge-Email": email}
701 response = self.container.conn.cdn_request('DELETE',
702 [self.container.name, self.name], hdrs=hdrs)
703 else:
704 response = self.container.conn.cdn_request('DELETE',
705 [self.container.name, self.name])
706
707 if (response.status < 200) or (response.status >= 299):
708 raise ResponseError(response.status, response.reason)
709
712 """
713 An iterable results set object for Objects.
714
715 This class implements dictionary- and list-like interfaces.
716 """
717 - def __init__(self, container, objects=None):
718 if objects is None:
719 objects = []
720 self._names = []
721 self._objects = []
722 for obj in objects:
723 try:
724 self._names.append(obj['name'])
725 except KeyError:
726
727 continue
728 else:
729 self._objects.append(obj)
730 self.container = container
731
734
736 return [Object(self.container, object_record=k) \
737 for k in self._objects[i:j]]
738
740 return item in self._objects
741
743 return len(self._objects)
744
746 return 'ObjectResults: %s objects' % len(self._objects)
747 __str__ = __repr__
748
749 - def index(self, value, *args):
750 """
751 returns an integer for the first index of value
752 """
753 return self._names.index(value, *args)
754
756 """
757 returns the number of occurrences of value
758 """
759 return self._names.count(value)
760
761
762