Package sword2 :: Module utils
[hide private]
[frames] | no frames]

Source Code for Module sword2.utils

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  """ 
  5  Utility methods used within the module 
  6  """ 
  7   
  8  from sword2_logging import logging 
  9  utils_l = logging.getLogger(__name__) 
 10   
 11  from time import time 
 12  from datetime import datetime 
 13   
 14  from base64 import b64encode 
 15   
 16  try: 
 17      from hashlib import md5 
 18  except ImportError: 
 19      import md5 
 20   
 21  import mimetypes 
 22   
 23  NS = {} 
 24  NS['dcterms'] = "{http://purl.org/dc/terms/}%s" 
 25  NS['sword'] ="{http://purl.org/net/sword/terms/}%s" 
 26  NS['atom'] = "{http://www.w3.org/2005/Atom}%s" 
 27  NS['app'] = "{http://www.w3.org/2007/app}%s" 
 28   
29 -def get_text(parent, tag, plural = False):
30 """Takes an `etree.Element` and a tag name to search for and retrieves the text attribute from any 31 of the parent element's direct children. 32 33 Returns a simple `str` if only a single element is found, or a list if multiple elements with the 34 same tag. Ignores element attributes, returning only the text.""" 35 text = None 36 for item in parent.findall(tag): 37 t = item.text 38 if not text: 39 if plural: 40 text = [t] 41 else: 42 text = t 43 elif isinstance(text, list): 44 text.append(t) 45 else: 46 text = [text, t] 47 return text
48
49 -def get_md5(data):
50 """Takes either a `str` or a file-like object and passes back a tuple containing (md5sum, filesize) 51 52 The file is streamed as 1Mb chunks so should work for large files. File-like object must support `seek()` 53 """ 54 if hasattr(data, "read") and hasattr(data, 'seek'): 55 m = md5() 56 chunk = data.read(1024*1024) # 1Mb 57 f_size = 0 58 while(chunk): 59 f_size += len(chunk) 60 m.update(chunk) 61 chunk = data.read(1024*1024) 62 data.seek(0) 63 return m.hexdigest(), f_size 64 else: # normal str 65 m = md5() 66 f_size = len(data) 67 m.update(data) 68 return m.hexdigest(), f_size
69 70
71 -class Timer(object):
72 """Simple timer, providing a 'stopwatch' mechanism. 73 74 Usage example: 75 76 >>> from sword2.utils import Timer 77 >>> from time import sleep 78 >>> t = Timer() 79 >>> t.get_timestamp() 80 datetime.datetime(2011, 6, 7, 7, 40, 53, 87248) 81 >>> t.get_loggable_timestamp() 82 '2011-06-07T07:40:53.087516' 83 84 >>> # Start a few timers 85 ... t.start("kaylee", "river", "inara") 86 >>> sleep(3) # wait a little while 87 >>> t.time_since_start("kaylee") 88 (0, 3.0048139095306396) 89 90 # tuple -> (index of the logged .duration, time since the .start method was called) 91 # eg 't.duration['kaylee'][0]' would equal 3.00481.... 92 93 >>> sleep(2) 94 >>> t.time_since_start("kaylee", "inara") 95 [(1, 5.00858998298645), (0, 5.00858998298645)] 96 >>> sleep(5) 97 >>> t.time_since_start("kaylee", "river") 98 [(2, 10.015379905700684), (0, 10.015379905700684)] 99 >>> sleep(4) 100 >>> t.time_since_start("kaylee", "inara", "river") 101 [(3, 14.021538972854614), (1, 14.021538972854614), (1, 14.021538972854614)] 102 103 # The order of the response is the same as the order of the names in the method call. 104 105 >>> # report back 106 ... t.duration['kaylee'] 107 [3.0048139095306396, 5.00858998298645, 10.015379905700684, 14.021538972854614] 108 >>> t.duration['inara'] 109 [5.00858998298645, 14.021538972854614] 110 >>> t.duration['river'] 111 [10.015379905700684, 14.021538972854614] 112 >>> 113 """
114 - def __init__(self):
115 self.reset_all()
116
117 - def reset_all(self):
118 self.counts = {} 119 self.duration = {} 120 self.stop = {}
121
122 - def reset(self, name):
123 if name in self.counts: 124 self.counts[name] = 0
125
126 - def read_raw(self, name):
127 return self.counts.get(name, None)
128
129 - def read(self, name):
130 if name in self.counts: 131 return datetime.fromtimestamp(self.counts[name]) 132 else: 133 return None
134
135 - def start(self, *args):
136 st_time = time() 137 for arg in args: 138 self.counts[arg] = st_time
139
140 - def stop(self, *args):
141 st_time = time() 142 for arg in args: 143 self.stop[arg] = st_time
144
145 - def get_timestamp(self):
146 # Convenience function 147 return datetime.now()
148
149 - def get_loggable_timestamp(self):
150 """Human-readable by intent""" 151 return datetime.now().isoformat()
152
153 - def time_since_start(self, *args):
154 r = [] 155 st_time = time() 156 for name in args: 157 if name in self.counts: 158 duration = st_time - self.counts[name] 159 if not self.duration.has_key(name): 160 self.duration[name] = [] 161 self.duration[name].append(duration) 162 r.append((len(self.duration[name]) - 1, duration)) 163 else: 164 r.append((0, 0)) 165 if len(r) == 1: 166 return r.pop() 167 else: 168 return r
169 170
171 -def get_content_type(filename):
172 # Does a simple .ext -> mimetype mapping. 173 # Generally better to specify the mimetype upfront. 174 return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
175 222