Package pyworker :: Module httpworker
[hide private]
[frames] | no frames]

Source Code for Module pyworker.httpworker

  1  #!/usr/bin/python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  # For the HTTPWorker 
  5  from urllib import urlencode 
  6  import httplib2 
  7   
  8  from pyworker import STATUSES, Worker, WorkerResponse, WorkerException, JsonMsgParseError 
  9   
 10  # For tempfile/ramfile handling: 
 11  # Unicode is highly likely, therefore StringIO > cStringIO 
 12  from StringIO import StringIO 
 13  from tempfile import mkstemp 
 14  from os import remove 
 15   
16 -class HTTPWorker(Worker):
17 """Gets a local copy of the resource at the URL in the JSON msg ('url') and simply 18 prints the first "line". 19 20 Handles an HTTP cache, so that multiple HTTPWorkers performing different tasks 21 on the same resource will not cause multiple calls to the same resource, unless 22 the E-Tag header is different. 23 24 If the E-Tag header is different, the updated resource is downloaded and passed 25 instead. 26 27 It is expected that self.endtask will be overwritten. 28 29 If the tempfile option is set, remember to delete the temporary file 30 as well as ack the msg! Eg - 31 ------------------------------------------------------------ 32 import os 33 class SolrFeeder(HTTPWorker): 34 def endtask(self, msg, response): 35 try: 36 # do stuff with response.context['fd'], the file-descriptor for the resource 37 finally: 38 response.context['fd'].close() 39 if self.context.get('tempfile', False): 40 os.remove(response.context['tempfile']) 41 self.queue_stdin.task_done() 42 43 s = SolrFeeder(queue_stdin, queue_stdout=None, tempfile = True) 44 ------------------------------------------------------------ 45 If 'id' is passed in the message instead, then this is inserted into a template, set 46 by instantiating this worker with the parameter 'http_template'. Normal python 47 string formating applies ( template % id ) 48 49 Requires configuration parameters: 50 http_template = template for the URL to GET 51 """
52 - def _get_tempfile(self):
53 return mkstemp()
54
55 - def _get_ramfile(self):
56 return (StringIO(), None)
57
58 - def httpsetup(self):
59 self.http_template = self.context.get('http_template', None) 60 self.h = httplib2.Http() 61 self.method = self.context.get('method', 'GET') 62 self.data_method = self.context.get('method', 'GETURL') 63 if self.context.get('tempfile', False): 64 self.tempfile = self._get_tempfile 65 else: 66 self.tempfile = self._get_ramfile 67 68 self.setup = True
69
70 - def starttask(self, msg):
71 """This will very simply GET the url supplied and pass the temp/ramfile to endtask""" 72 try: 73 if not self.setup: 74 self.httpsetup() 75 (fd, name) = self.tempfile() 76 jmsg = self.parse_json_msg(msg) 77 # Prepare HTTP request 78 headers = {} 79 if 'headers' in jmsg: 80 headers = jmsg['headers'] 81 url = None 82 if 'url' in jmsg: 83 url = jmsg['url'] 84 elif 'id' in jmsg and self.http_template: 85 url = self.http_template % jmsg['id'] 86 else: 87 return WorkerResponse(FAIL) 88 if not url: 89 raise Exception("url not supplied") 90 fd.write(h.request(jmsg['url'], "GET", headers=headers)) 91 fd.seek(0) 92 return WorkerResponse(COMPLETE, fd=fd, tempfile=name, jmsg=jmsg, url=url) 93 except Exception, e: 94 return WorkerResponse(FAIL, exception = e)
95
96 - def endtask(self, msg, response):
97 """Demo method to be overwritten. This simply reads the first 100 characters from 98 the reponse.context['fd'] (file-handle) and deletes/removes the file.""" 99 try: 100 first_bit = response.context['fd'].read(100) 101 if self.queue_stdout: 102 self.queue_stdout.put(first_bit) 103 else: 104 print "From url: %s, first 100 chars: \n %s" % (response.context['url'], first_bit) 105 finally: 106 response.context['fd'].close() 107 if self.context.get('tempfile', False): 108 remove(response.context['tempfile']) 109 self.queue_stdin.task_done()
110