1 """
2 connection operations
3
4 Connection instances are used to communicate with the remote service at
5 the account level creating, listing and deleting Containers, and returning
6 Container instances.
7
8 See COPYING for license information.
9 """
10
11 import socket
12 import os
13 from urllib import quote
14 from httplib import HTTPSConnection, HTTPConnection, HTTPException
15 from container import Container, ContainerResults
16 from utils import parse_url, THTTPConnection, THTTPSConnection
17 from errors import ResponseError, NoSuchContainer, ContainerNotEmpty, \
18 InvalidContainerName, CDNNotEnabled
19 from Queue import Queue, Empty, Full
20 from time import time
21 import consts
22 from authentication import Authentication
23 from fjson import json_loads
24 from sys import version_info
25
26
27
28
29
31 """
32 Manages the connection to the storage system and serves as a factory
33 for Container instances.
34
35 @undocumented: cdn_connect
36 @undocumented: http_connect
37 @undocumented: cdn_request
38 @undocumented: make_request
39 @undocumented: _check_container_name
40 """
41
42 - def __init__(self, username=None, api_key=None, timeout=5, **kwargs):
43 """
44 Accepts keyword arguments for Mosso username and api key.
45 Optionally, you can omit these keywords and supply an
46 Authentication object using the auth keyword. Setting the argument
47 servicenet to True will make use of Rackspace servicenet network.
48
49 @type username: str
50 @param username: a Mosso username
51 @type api_key: str
52 @param api_key: a Mosso API key
53 @type servicenet: bool
54 @param servicenet: Use Rackspace servicenet to access Cloud Files.
55 @type cdn_log_retention: bool
56 @param cdn_log_retention: set logs retention for this cdn enabled
57 container.
58 """
59 self.cdn_enabled = False
60 self.cdn_args = None
61 self.connection_args = None
62 self.cdn_connection = None
63 self.connection = None
64 self.token = None
65 self.debuglevel = int(kwargs.get('debuglevel', 0))
66 self.servicenet = kwargs.get('servicenet', False)
67 self.user_agent = kwargs.get('useragent', consts.user_agent)
68 self.timeout = timeout
69
70
71
72 if not 'servicenet' in kwargs \
73 and 'RACKSPACE_SERVICENET' in os.environ:
74 self.servicenet = True
75
76 self.auth = 'auth' in kwargs and kwargs['auth'] or None
77
78 if not self.auth:
79 authurl = kwargs.get('authurl', consts.us_authurl)
80 if username and api_key and authurl:
81 self.auth = Authentication(username, api_key, authurl=authurl,
82 useragent=self.user_agent)
83 else:
84 raise TypeError("Incorrect or invalid arguments supplied")
85
86 self._authenticate()
87
89 """
90 Authenticate and setup this instance with the values returned.
91 """
92 (url, self.cdn_url, self.token) = self.auth.authenticate()
93 url = self._set_storage_url(url)
94 self.connection_args = parse_url(url)
95
96 if version_info[0] <= 2 and version_info[1] < 6:
97 self.conn_class = self.connection_args[3] and THTTPSConnection or \
98 THTTPConnection
99 else:
100 self.conn_class = self.connection_args[3] and HTTPSConnection or \
101 HTTPConnection
102 self.http_connect()
103 if self.cdn_url:
104 self.cdn_connect()
105
107 if self.servicenet:
108 return "https://snet-%s" % url.replace("https://", "")
109 return url
110
112 """
113 Setup the http connection instance for the CDN service.
114 """
115 (host, port, cdn_uri, is_ssl) = parse_url(self.cdn_url)
116 self.cdn_connection = self.conn_class(host, port, timeout=self.timeout)
117 self.cdn_enabled = True
118
120 """
121 Setup the http connection instance.
122 """
123 (host, port, self.uri, is_ssl) = self.connection_args
124 self.connection = self.conn_class(host, port=port, \
125 timeout=self.timeout)
126 self.connection.set_debuglevel(self.debuglevel)
127
128 - def cdn_request(self, method, path=[], data='', hdrs=None):
129 """
130 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and
131 metadata dicts, performs an http request against the CDN service.
132 """
133 if not self.cdn_enabled:
134 raise CDNNotEnabled()
135
136 path = '/%s/%s' % \
137 (self.uri.rstrip('/'), '/'.join([quote(i) for i in path]))
138 headers = {'Content-Length': str(len(data)),
139 'User-Agent': self.user_agent,
140 'X-Auth-Token': self.token}
141 if isinstance(hdrs, dict):
142 headers.update(hdrs)
143
144 def retry_request():
145 '''Re-connect and re-try a failed request once'''
146 self.cdn_connect()
147 self.cdn_connection.request(method, path, data, headers)
148 return self.cdn_connection.getresponse()
149
150 try:
151 self.cdn_connection.request(method, path, data, headers)
152 response = self.cdn_connection.getresponse()
153 except (socket.error, IOError, HTTPException):
154 response = retry_request()
155 if response.status == 401:
156 self._authenticate()
157 headers['X-Auth-Token'] = self.token
158 response = retry_request()
159
160 return response
161
162 - def make_request(self, method, path=[], data='', hdrs=None, parms=None):
163 """
164 Given a method (i.e. GET, PUT, POST, etc), a path, data, header and
165 metadata dicts, and an optional dictionary of query parameters,
166 performs an http request.
167 """
168 path = '/%s/%s' % \
169 (self.uri.rstrip('/'), '/'.join([quote(i) for i in path]))
170
171 if isinstance(parms, dict) and parms:
172 query_args = \
173 ['%s=%s' % (quote(x),
174 quote(str(y))) for (x, y) in parms.items()]
175 path = '%s?%s' % (path, '&'.join(query_args))
176
177 headers = {'Content-Length': str(len(data)),
178 'User-Agent': self.user_agent,
179 'X-Auth-Token': self.token}
180 isinstance(hdrs, dict) and headers.update(hdrs)
181
182 def retry_request():
183 '''Re-connect and re-try a failed request once'''
184 self.http_connect()
185 self.connection.request(method, path, data, headers)
186 return self.connection.getresponse()
187
188 try:
189 self.connection.request(method, path, data, headers)
190 response = self.connection.getresponse()
191 except (socket.error, IOError, HTTPException):
192 response = retry_request()
193 if response.status == 401:
194 self._authenticate()
195 headers['X-Auth-Token'] = self.token
196 response = retry_request()
197
198 return response
199
201 """
202 Return tuple for number of containers and total bytes in the account
203
204 >>> connection.get_info()
205 (5, 2309749)
206
207 @rtype: tuple
208 @return: a tuple containing the number of containers and total bytes
209 used by the account
210 """
211 response = self.make_request('HEAD')
212 count = size = None
213 for hdr in response.getheaders():
214 if hdr[0].lower() == 'x-account-container-count':
215 try:
216 count = int(hdr[1])
217 except ValueError:
218 count = 0
219 if hdr[0].lower() == 'x-account-bytes-used':
220 try:
221 size = int(hdr[1])
222 except ValueError:
223 size = 0
224 buff = response.read()
225 if (response.status < 200) or (response.status > 299):
226 raise ResponseError(response.status, response.reason)
227 return (count, size)
228
230 if not container_name or \
231 '/' in container_name or \
232 len(container_name) > consts.container_name_limit:
233 raise InvalidContainerName(container_name)
234
236 """
237 Given a container name, returns a L{Container} item, creating a new
238 Container if one does not already exist.
239
240 >>> connection.create_container('new_container')
241 <cloudfiles.container.Container object at 0xb77d628c>
242
243 @param container_name: name of the container to create
244 @type container_name: str
245 @rtype: L{Container}
246 @return: an object representing the newly created container
247 """
248 self._check_container_name(container_name)
249
250 response = self.make_request('PUT', [container_name])
251 buff = response.read()
252 if (response.status < 200) or (response.status > 299):
253 raise ResponseError(response.status, response.reason)
254 return Container(self, container_name)
255
257 """
258 Given a container name, delete it.
259
260 >>> connection.delete_container('old_container')
261
262 @param container_name: name of the container to delete
263 @type container_name: str
264 """
265 if isinstance(container_name, Container):
266 container_name = container_name.name
267 self._check_container_name(container_name)
268
269 response = self.make_request('DELETE', [container_name])
270 buff = response.read()
271
272 if (response.status == 409):
273 raise ContainerNotEmpty(container_name)
274 elif (response.status < 200) or (response.status > 299):
275 raise ResponseError(response.status, response.reason)
276
277 if self.cdn_enabled:
278 response = self.cdn_request('POST', [container_name],
279 hdrs={'X-CDN-Enabled': 'False'})
280
282 """
283 Returns a Container item result set.
284
285 >>> connection.get_all_containers()
286 ContainerResults: 4 containers
287 >>> print ', '.join([container.name for container in
288 connection.get_all_containers()])
289 new_container, old_container, pictures, music
290
291 @rtype: L{ContainerResults}
292 @return: an iterable set of objects representing all containers on the
293 account
294 @param limit: number of results to return, up to 10,000
295 @type limit: int
296 @param marker: return only results whose name is greater than "marker"
297 @type marker: str
298 """
299 if limit:
300 parms['limit'] = limit
301 if marker:
302 parms['marker'] = marker
303 return ContainerResults(self, self.list_containers_info(**parms))
304
306 """
307 Return a single Container item for the given Container.
308
309 >>> connection.get_container('old_container')
310 <cloudfiles.container.Container object at 0xb77d628c>
311 >>> container = connection.get_container('old_container')
312 >>> container.size_used
313 23074
314
315 @param container_name: name of the container to create
316 @type container_name: str
317 @rtype: L{Container}
318 @return: an object representing the container
319 """
320 self._check_container_name(container_name)
321
322 response = self.make_request('HEAD', [container_name])
323 count = size = None
324 for hdr in response.getheaders():
325 if hdr[0].lower() == 'x-container-object-count':
326 try:
327 count = int(hdr[1])
328 except ValueError:
329 count = 0
330 if hdr[0].lower() == 'x-container-bytes-used':
331 try:
332 size = int(hdr[1])
333 except ValueError:
334 size = 0
335 buff = response.read()
336 if response.status == 404:
337 raise NoSuchContainer(container_name)
338 if (response.status < 200) or (response.status > 299):
339 raise ResponseError(response.status, response.reason)
340 return Container(self, container_name, count, size)
341
343 """
344 Returns a list of containers that have been published to the CDN.
345
346 >>> connection.list_public_containers()
347 ['container1', 'container2', 'container3']
348
349 @rtype: list(str)
350 @return: a list of all CDN-enabled container names as strings
351 """
352 response = self.cdn_request('GET', [''])
353 if (response.status < 200) or (response.status > 299):
354 buff = response.read()
355 raise ResponseError(response.status, response.reason)
356 return response.read().splitlines()
357
359 """
360 Returns a list of Containers, including object count and size.
361
362 >>> connection.list_containers_info()
363 [{u'count': 510, u'bytes': 2081717, u'name': u'new_container'},
364 {u'count': 12, u'bytes': 23074, u'name': u'old_container'},
365 {u'count': 0, u'bytes': 0, u'name': u'container1'},
366 {u'count': 0, u'bytes': 0, u'name': u'container2'},
367 {u'count': 0, u'bytes': 0, u'name': u'container3'},
368 {u'count': 3, u'bytes': 2306, u'name': u'test'}]
369
370 @rtype: list({"name":"...", "count":..., "bytes":...})
371 @return: a list of all container info as dictionaries with the
372 keys "name", "count", and "bytes"
373 @param limit: number of results to return, up to 10,000
374 @type limit: int
375 @param marker: return only results whose name is greater than "marker"
376 @type marker: str
377 """
378 if limit:
379 parms['limit'] = limit
380 if marker:
381 parms['marker'] = marker
382 parms['format'] = 'json'
383 response = self.make_request('GET', [''], parms=parms)
384 if (response.status < 200) or (response.status > 299):
385 buff = response.read()
386 raise ResponseError(response.status, response.reason)
387 return json_loads(response.read())
388
390 """
391 Returns a list of Containers.
392
393 >>> connection.list_containers()
394 ['new_container',
395 'old_container',
396 'container1',
397 'container2',
398 'container3',
399 'test']
400
401 @rtype: list(str)
402 @return: a list of all containers names as strings
403 @param limit: number of results to return, up to 10,000
404 @type limit: int
405 @param marker: return only results whose name is greater than "marker"
406 @type marker: str
407 """
408 if limit:
409 parms['limit'] = limit
410 if marker:
411 parms['marker'] = marker
412 response = self.make_request('GET', [''], parms=parms)
413 if (response.status < 200) or (response.status > 299):
414 buff = response.read()
415 raise ResponseError(response.status, response.reason)
416 return response.read().splitlines()
417
419 """
420 Container objects can be grabbed from a connection using index
421 syntax.
422
423 >>> container = conn['old_container']
424 >>> container.size_used
425 23074
426
427 @rtype: L{Container}
428 @return: an object representing the container
429 """
430 return self.get_container(key)
431
432
434 """
435 A thread-safe connection pool object.
436
437 This component isn't required when using the cloudfiles library, but it may
438 be useful when building threaded applications.
439 """
440
441 - def __init__(self, username=None, api_key=None, **kwargs):
442 auth = kwargs.get('auth', None)
443 self.timeout = kwargs.get('timeout', 5)
444 self.connargs = {'username': username, 'api_key': api_key}
445 poolsize = kwargs.get('poolsize', 10)
446 Queue.__init__(self, poolsize)
447
449 """
450 Return a cloudfiles connection object.
451
452 @rtype: L{Connection}
453 @return: a cloudfiles connection object
454 """
455 try:
456 (create, connobj) = Queue.get(self, block=0)
457 except Empty:
458 connobj = Connection(**self.connargs)
459 return connobj
460
461 - def put(self, connobj):
462 """
463 Place a cloudfiles connection object back into the pool.
464
465 @param connobj: a cloudfiles connection object
466 @type connobj: L{Connection}
467 """
468 try:
469 Queue.put(self, (time(), connobj), block=0)
470 except Full:
471 del connobj
472
473