1
2
3
4
5 from urllib import urlencode
6 import httplib2
7
8 from pyworker import STATUSES, Worker, WorkerResponse, WorkerException, JsonMsgParseError
9
10
11
12 from StringIO import StringIO
13 from tempfile import mkstemp
14 from os import remove
15
17 """Gets a local copy of the resource at the URL in the JSON msg ('url') and simply
18 prints the first "line".
19
20 Handles an HTTP cache, so that multiple HTTPWorkers performing different tasks
21 on the same resource will not cause multiple calls to the same resource, unless
22 the E-Tag header is different.
23
24 If the E-Tag header is different, the updated resource is downloaded and passed
25 instead.
26
27 It is expected that self.endtask will be overwritten.
28
29 If the tempfile option is set, remember to delete the temporary file
30 as well as ack the msg! Eg -
31 ------------------------------------------------------------
32 import os
33 class SolrFeeder(HTTPWorker):
34 def endtask(self, msg, response):
35 try:
36 # do stuff with response.context['fd'], the file-descriptor for the resource
37 finally:
38 response.context['fd'].close()
39 if self.context.get('tempfile', False):
40 os.remove(response.context['tempfile'])
41 self.queue_stdin.task_done()
42
43 s = SolrFeeder(queue_stdin, queue_stdout=None, tempfile = True)
44 ------------------------------------------------------------
45 If 'id' is passed in the message instead, then this is inserted into a template, set
46 by instantiating this worker with the parameter 'http_template'. Normal python
47 string formating applies ( template % id )
48
49 Requires configuration parameters:
50 http_template = template for the URL to GET
51 """
54
56 return (StringIO(), None)
57
59 self.http_template = self.context.get('http_template', None)
60 self.h = httplib2.Http()
61 self.method = self.context.get('method', 'GET')
62 self.data_method = self.context.get('method', 'GETURL')
63 if self.context.get('tempfile', False):
64 self.tempfile = self._get_tempfile
65 else:
66 self.tempfile = self._get_ramfile
67
68 self.setup = True
69
71 """This will very simply GET the url supplied and pass the temp/ramfile to endtask"""
72 try:
73 if not self.setup:
74 self.httpsetup()
75 (fd, name) = self.tempfile()
76 jmsg = self.parse_json_msg(msg)
77
78 headers = {}
79 if 'headers' in jmsg:
80 headers = jmsg['headers']
81 url = None
82 if 'url' in jmsg:
83 url = jmsg['url']
84 elif 'id' in jmsg and self.http_template:
85 url = self.http_template % jmsg['id']
86 else:
87 return WorkerResponse(FAIL)
88 if not url:
89 raise Exception("url not supplied")
90 fd.write(h.request(jmsg['url'], "GET", headers=headers))
91 fd.seek(0)
92 return WorkerResponse(COMPLETE, fd=fd, tempfile=name, jmsg=jmsg, url=url)
93 except Exception, e:
94 return WorkerResponse(FAIL, exception = e)
95
97 """Demo method to be overwritten. This simply reads the first 100 characters from
98 the reponse.context['fd'] (file-handle) and deletes/removes the file."""
99 try:
100 first_bit = response.context['fd'].read(100)
101 if self.queue_stdout:
102 self.queue_stdout.put(first_bit)
103 else:
104 print "From url: %s, first 100 chars: \n %s" % (response.context['url'], first_bit)
105 finally:
106 response.context['fd'].close()
107 if self.context.get('tempfile', False):
108 remove(response.context['tempfile'])
109 self.queue_stdin.task_done()
110