1
2
3
4
5
6 """
7 An implementation of a freenet client library for
8 FCP v2, offering considerable flexibility.
9
10 Clients should instantiate FCPNode, then execute
11 its methods to perform tasks with FCP.
12
13 This module was written by aum, May 2006, released under the GNU Lesser General
14 Public License.
15
16 No warranty, yada yada
17
18 For FCP documentation, see http://wiki.freenetproject.org/FCPv2
19
20 """
21
22
23
24 import Queue
25 import base64
26 import mimetypes
27 import os
28 import pprint
29 import random
30 import select
31 import hashlib
32 import socket
33 import stat
34 import sys
35 import tempfile
36 import thread
37 import threading
38 import time
39 import traceback
40 import re
41 import unicodedata
42
43 import pseudopythonparser
44
45 _pollInterval = 0.03
46
47
48
50 """
51 cannot connect to given host/port
52 """
53
55 """
56 The following code would pose a privacy risk
57 """
58
60
62
63 if not info:
64 info = kw
65 self.info = info
66
67 Exception.__init__(self, str(info))
68
70
71 parts = []
72 for k in ['header', 'ShortCodeDescription', 'CodeDescription']:
73 if self.info.has_key(k):
74 parts.append(str(self.info[k]))
75 return ";".join(parts) or "??"
76
79
82
85
87 """
88 node seems to have died
89 """
90
92 """
93 timed out waiting for command to be sent to node
94 """
95 pass
96
98 """
99 timed out waiting for node to respond
100 """
101
103 """
104 name services name lookup failed
105 """
106
107
108
109
110 defaultFCPHost = "127.0.0.1"
111 defaultFCPPort = 9481
112 defaultFProxyHost = "127.0.0.1"
113 defaultFProxyPort = 8888
114
115
116 if os.environ.has_key("FCP_HOST"):
117 defaultFCPHost = os.environ["FCP_HOST"].strip()
118 if os.environ.has_key("FCP_PORT"):
119 defaultFCPPort = int(os.environ["FCP_PORT"].strip())
120
121
122 if os.environ.has_key("FPROXY_HOST"):
123 defaultFProxyHost = os.environ["FPROXY_HOST"].strip()
124 if os.environ.has_key("FPROXY_PORT"):
125 defaultFProxyPort = int(os.environ["FPROXY_PORT"].strip())
126
127
128 pollTimeout = 0.1
129
130
131
132
133 intKeys = [
134 'DataLength', 'Code',
135 ]
136
137
138 expectedVersion="2.0"
139
140
141 SILENT = 0
142 FATAL = 1
143 CRITICAL = 2
144 ERROR = 3
145 INFO = 4
146 DETAIL = 5
147 DEBUG = 6
148 NOISY = 7
149
150
151 PEER_NOTE_PRIVATE_DARKNET_COMMENT = 1
152
153 defaultVerbosity = ERROR
154
155 ONE_YEAR = 86400 * 365
156
157
158
159 fcpVersion = "0.2.5"
160
161
162
163
164
165
167 """
168 Represents an interface to a freenet node via its FCP port,
169 and exposes primitives for the basic genkey, get, put and putdir
170 operations as well as peer management primitives.
171
172 Only one instance of FCPNode is needed across an entire
173 running client application, because its methods are quite thread-safe.
174 Creating 2 or more instances is a waste of resources.
175
176 Clients, when invoking methods, have several options regarding flow
177 control and event notification:
178
179 - synchronous call (the default). Here, no pending status messages
180 will ever be seen, and the call will only control when it has
181 completed (successfully, or otherwise)
182
183 - asynchronous call - this is invoked by passing the keyword argument
184 'async=True' to any of the main primitives. When a primitive is invoked
185 asynchronously, it will return a 'job ticket object' immediately. This
186 job ticket has methods for polling for job completion, or blocking
187 awaiting completion
188
189 - setting a callback. You can pass to any of the primitives a
190 'callback=somefunc' keyword arg, where 'somefunc' is a callable object
191 conforming to 'def somefunc(status, value)'
192
193 The callback function will be invoked when a primitive succeeds or fails,
194 as well as when a pending message is received from the node.
195
196 The 'status' argument passed will be one of:
197 - 'successful' - the primitive succeeded, and 'value' will contain
198 the result of the primitive
199 - 'pending' - the primitive is still executing, and 'value' will
200 contain the raw pending message sent back from the node, as a
201 dict
202 - 'failed' - the primitive failed, and as with 'pending', the
203 argument 'value' contains a dict containing the message fields
204 sent back from the node
205
206 Note that callbacks can be set in both synchronous and asynchronous
207 calling modes.
208
209 """
210
211 svnLongRevision = "$Revision$"
212 svnRevision = svnLongRevision[ 11 : -2 ]
213
214
215
216 noCloseSocket = True
217
218 nodeIsAlive = False
219
220 nodeVersion = None;
221 nodeFCPVersion = None;
222 nodeBuild = None;
223 nodeRevision = None;
224 nodeExtBuild = None;
225 nodeExtRevision = None;
226 nodeIsTestnet = None;
227 compressionCodecs = [("GZIP", 0), ("BZIP2", 1), ("LZMA", 2)];
228
229
230
231
233 """
234 Create a connection object
235
236 Keyword Arguments:
237 - name - name of client to use with reqs, defaults to random. This
238 is crucial if you plan on making persistent requests
239 - host - hostname, defaults to environment variable FCP_HOST, and
240 if this doesn't exist, then defaultFCPHost
241 - port - port number, defaults to environment variable FCP_PORT, and
242 if this doesn't exist, then defaultFCPPort
243 - logfile - a pathname or writable file object, to which log messages
244 should be written, defaults to stdout unless logfunc is specified
245 - logfunc - a function to which log messages should be written or None
246 for no such function should be used, defaults to None
247 - verbosity - how detailed the log messages should be, defaults to 0
248 (silence)
249 - socketTimeout - value to pass to socket object's settimeout() if
250 available and the value is not None, defaults to None
251
252 Attributes of interest:
253 - jobs - a dict of currently running jobs (persistent and nonpersistent).
254 keys are job ids and values are JobTicket objects
255
256 Notes:
257 - when the connection is created, a 'hello' handshake takes place.
258 After that handshake, the node sends back a list of outstanding persistent
259 requests left over from the last connection (based on the value of
260 the 'name' keyword passed into this constructor).
261
262 This object then wraps all this info into JobTicket instances and stores
263 them in the self.persistentJobs dict
264
265 """
266
267 self.running = False
268 self.nodeIsAlive = False
269 self.testedDDA = {}
270
271
272 env = os.environ
273 self.name = kw.get('name', self._getUniqueId())
274 self.host = kw.get('host', env.get("FCP_HOST", defaultFCPHost))
275 self.port = kw.get('port', env.get("FCP_PORT", defaultFCPPort))
276 self.port = int(self.port)
277 self.socketTimeout = kw.get('socketTimeout', None)
278
279
280 self.connectionidentifier = None
281
282
283 logfile = kw.get('logfile', None)
284 logfunc = kw.get('logfunc', None)
285 if(None == logfile and None == logfunc):
286 logfile = sys.stdout
287 if(None != logfile and not hasattr(logfile, 'write')):
288
289 if not isinstance(logfile, str):
290 raise Exception("Bad logfile '%s', must be pathname or file object" % logfile)
291 logfile = file(logfile, "a")
292 self.logfile = logfile
293 self.logfunc = logfunc
294 self.verbosity = kw.get('verbosity', defaultVerbosity)
295
296
297 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
298 if(None != self.socketTimeout):
299 try:
300 self.socket.settimeout(self.socketTimeout)
301 except Exception, e:
302
303 pass
304 try:
305 self.socket.connect((self.host, self.port))
306 except Exception, e:
307 raise Exception("Failed to connect to %s:%s - %s" % (self.host,
308 self.port,
309 e))
310
311
312 self._hello()
313 self.nodeIsAlive = True
314
315
316 self.jobs = {}
317 self.keepJobs = []
318
319
320 self.clientReqQueue = Queue.Queue()
321
322
323 self.running = True
324 self.shutdownLock = threading.Lock()
325 thread.start_new_thread(self._mgrThread, ())
326
327
328 namesitefile = kw.get('namesitefile', None)
329 self.namesiteInit(namesitefile)
330
331
332
334 """
335 object is getting cleaned up, so disconnect
336 """
337
338 try:
339 self.shutdown()
340 except:
341 traceback.print_exc()
342 pass
343
344
345
346
347
348
349
351 """
352 Generates and returns an SSK keypair
353
354 Keywords:
355 - async - whether to do this call asynchronously, and
356 return a JobTicket object
357 - callback - if given, this should be a callable which accepts 2
358 arguments:
359 - status - will be one of 'successful', 'failed' or 'pending'
360 - value - depends on status:
361 - if status is 'successful', this will contain the value
362 returned from the command
363 - if status is 'failed' or 'pending', this will contain
364 a dict containing the response from node
365 - usk - default False - if True, returns USK uris
366 - name - the path to put at end, optional
367 """
368 id = kw.pop("id", None)
369 if not id:
370 id = self._getUniqueId()
371
372 pub, priv = self._submitCmd(id, "GenerateSSK", Identifier=id, **kw)
373
374 name = kw.get("name", None)
375 if name:
376 pub = pub + name
377 priv = priv + name
378
379 if kw.get("usk", False):
380 pub = pub.replace("SSK@", "USK@")+"/0"
381 priv = priv.replace("SSK@", "USK@")+"/0"
382
383 return pub, priv
384
385
386
388 """
389 Sends an FCPPluginMessage and returns FCPPluginReply message contents
390
391 Keywords:
392 - async - whether to do this call asynchronously, and
393 return a JobTicket object
394 - callback - if given, this should be a callable which accepts 2
395 arguments:
396 - status - will be one of 'successful', 'failed' or 'pending'
397 - value - depends on status:
398 - if status is 'successful', this will contain the value
399 returned from the command
400 - if status is 'failed' or 'pending', this will contain
401 a dict containing the response from node
402 - plugin_name - A name to identify the plugin. The same as class name
403 shown on plugins page.
404 - plugin_params - a dict() containing the key-value pairs to be sent
405 to the plugin as parameters
406 """
407
408 id = kw.pop("id", None)
409 if not id:
410 id = self._getUniqueId()
411
412 params = dict(PluginName = kw.get('plugin_name'),
413 Identifier = id,
414 async = kw.get('async',False),
415 callback = kw.get('callback',None))
416
417 for key, val in kw.get('plugin_params',{}).iteritems():
418 params.update({'Param.%s' % str(key) : val})
419
420 return self._submitCmd(id, "FCPPluginMessage", **params)
421
422
423 - def get(self, uri, **kw):
424 """
425 Does a direct get of a key
426
427 Keywords:
428 - async - whether to return immediately with a job ticket object, default
429 False (wait for completion)
430 - persistence - default 'connection' - the kind of persistence for
431 this request. If 'reboot' or 'forever', this job will be able to
432 be recalled in subsequent FCP sessions. Other valid values are
433 'reboot' and 'forever', as per FCP spec
434 - Global - default false - if evaluates to true, puts this request
435 on the global queue. Note the capital G in Global. If you set this,
436 persistence must be 'reboot' or 'forever'
437 - Verbosity - default 0 - sets the Verbosity mask passed in the
438 FCP message - case-sensitive
439 - priority - the PriorityClass for retrieval, default 2, may be between
440 0 (highest) to 6 (lowest)
441
442 - dsnly - whether to only check local datastore
443 - ignoreds - don't check local datastore
444
445 - file - if given, this is a pathname to which to store the retrieved key
446 - followRedirect - follow a redirect if true, otherwise fail the get
447 - nodata - if true, no data will be returned. This can be a useful
448 test of whether a key is retrievable, without having to consume
449 resources by retrieving it
450 - stream - if given, this is a writeable file object, to which the
451 received data should be written a chunk at a time
452 - timeout - timeout for completion, in seconds, default one year
453
454 Returns a 3-tuple, depending on keyword args:
455 - if 'file' is given, returns (mimetype, pathname) if key is returned
456 - if 'file' is not given, returns (mimetype, data, msg) if key is returned
457 - if 'nodata' is true, returns (mimetype, 1) if key is returned
458 - if 'stream' is given, returns (mimetype, None) if key is returned,
459 because all the data will have been written to the stream
460 If key is not found, raises an exception
461 """
462 self._log(INFO, "get: uri=%s" % uri)
463
464 self._log(DETAIL, "get: kw=%s" % kw)
465
466
467
468 opts = {}
469
470 id = kw.pop("id", None)
471 if not id:
472 id = self._getUniqueId()
473
474 opts['async'] = kw.pop('async', False)
475 opts['followRedirect'] = kw.pop('followRedirect', False)
476 opts['waituntilsent'] = kw.get('waituntilsent', False)
477 if kw.has_key('callback'):
478 opts['callback'] = kw['callback']
479 opts['Persistence'] = kw.pop('persistence', 'connection')
480 if kw.get('Global', False):
481 print "global get"
482 opts['Global'] = "true"
483 else:
484 opts['Global'] = "false"
485
486 opts['Verbosity'] = kw.get('Verbosity', 0)
487
488 if opts['Global'] == 'true' and opts['Persistence'] == 'connection':
489 raise Exception("Global requests must be persistent")
490
491 file = kw.pop("file", None)
492 if file:
493
494 file = os.path.abspath(file)
495 opts['ReturnType'] = "disk"
496
497 opts['Filename'] = file
498
499
500 self.testDDA(Directory=os.path.dirname(file),
501 WantWriteDirectory=True)
502
503 elif kw.get('nodata', False):
504 nodata = True
505 opts['ReturnType'] = "none"
506 elif kw.has_key('stream'):
507 opts['ReturnType'] = "direct"
508 opts['stream'] = kw['stream']
509 else:
510 nodata = False
511 opts['ReturnType'] = "direct"
512
513 opts['Identifier'] = id
514
515 if kw.get("ignoreds", False):
516 opts["IgnoreDS"] = "true"
517 else:
518 opts["IgnoreDS"] = "false"
519
520 if kw.get("dsonly", False):
521 opts["DSOnly"] = "true"
522 else:
523 opts["DSOnly"] = "false"
524
525
526
527
528
529 uri = uri.split("freenet:")[-1]
530 if len(uri) < 5 or (uri[:4] not in ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')):
531
532 try:
533 domain, rest = uri.split("/", 1)
534 except:
535 domain = uri
536 rest = ''
537
538 tgtUri = self.namesiteLookup(domain)
539 if not tgtUri:
540 raise FCPNameLookupFailure(
541 "Failed to resolve freenet domain '%s'" % domain)
542 if rest:
543 uri = (tgtUri + "/" + rest).replace("//", "/")
544 else:
545 uri = tgtUri
546
547 opts['URI'] = uri
548
549 opts['MaxRetries'] = kw.get("maxretries", -1)
550 opts['MaxSize'] = kw.get("maxsize", "1000000000000")
551 opts['PriorityClass'] = int(kw.get("priority", 2))
552
553 opts['timeout'] = int(kw.pop("timeout", ONE_YEAR))
554
555
556
557
558
559 return self._submitCmd(id, "ClientGet", **opts)
560
561
562
563 - def put(self, uri="CHK@", **kw):
564 """
565 Inserts a key
566
567 Arguments:
568 - uri - uri under which to insert the key
569
570 Keywords - you must specify one of the following to choose an insert mode:
571 - file - path of file from which to read the key data
572 - data - the raw data of the key as string
573 - dir - the directory to insert, for freesite insertion
574 - redirect - the target URI to redirect to
575
576 Keywords for 'file' mode:
577 - name - human-readable target filename - default is taken from URI
578
579 Keywords for 'dir' mode:
580 - name - name of the freesite, the 'sitename' in SSK@privkey/sitename'
581 - usk - whether to insert as a USK (USK@privkey/sitename/version/), default False
582 - version - valid if usk is true, default 0
583
584 Keywords for 'file' and 'data' modes:
585 - chkonly - only generate CHK, don't insert - default false
586 - dontcompress - do not compress on insert - default false
587
588 Keywords for 'file', 'data' and 'redirect' modes:
589 - mimetype - the mime type, default text/plain
590
591 Keywords valid for all modes:
592 - async - whether to do the job asynchronously, returning a job ticket
593 object (default False)
594 - waituntilsent - default False, if True, and if async=True, waits
595 until the command has been sent to the node before returning a
596 job object
597 - persistence - default 'connection' - the kind of persistence for
598 this request. If 'reboot' or 'forever', this job will be able to
599 be recalled in subsequent FCP sessions. Other valid values are
600 'reboot' and 'forever', as per FCP spec
601 - Global - default false - if evaluates to true, puts this request
602 on the global queue. Note the capital G in Global. If you set this,
603 persistence must be 'reboot' or 'forever'
604 - Verbosity - default 0 - sets the Verbosity mask passed in the
605 FCP message - case-sensitive
606
607 - maxretries - maximum number of retries, default 3
608 - priority - the PriorityClass for retrieval, default 3, may be between
609 0 (highest) to 6 (lowest)
610 - realtime true/false - sets the RealTimeRequest flag.
611
612 - timeout - timeout for completion, in seconds, default one year
613
614 Notes:
615 - exactly one of 'file', 'data' or 'dir' keyword arguments must be present
616 """
617
618 if kw.has_key('dir'):
619 self._log(DETAIL, "put => putdir")
620 return self.putdir(uri, **kw)
621
622
623
624 opts = {}
625
626 opts['async'] = kw.get('async', False)
627 opts['waituntilsent'] = kw.get('waituntilsent', False)
628 opts['keep'] = kw.get('keep', False)
629 if kw.has_key('callback'):
630 opts['callback'] = kw['callback']
631
632 self._log(DETAIL, "put: uri=%s async=%s waituntilsent=%s" % (
633 uri, opts['async'], opts['waituntilsent']))
634
635 opts['Persistence'] = kw.pop('persistence', 'connection')
636 if kw.get('Global', False):
637 opts['Global'] = "true"
638 else:
639 opts['Global'] = "false"
640
641 if opts['Global'] == 'true' and opts['Persistence'] == 'connection':
642 raise Exception("Global requests must be persistent")
643
644
645 uri = uri.split("freenet:")[-1]
646 if len(uri) < 4 or (uri[:4] not in ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')):
647
648 try:
649 domain, rest = uri.split("/", 1)
650 except:
651 domain = uri
652 rest = ''
653
654 tgtUri = self.namesiteLookup(domain)
655 if not tgtUri:
656 raise FCPNameLookupFailure(
657 "Failed to resolve freenet domain '%s'" % domain)
658 if rest:
659 uri = (tgtUri + "/" + rest).replace("//", "/")
660 else:
661 uri = tgtUri
662
663 opts['URI'] = uri
664
665
666 mimetype = kw.get("mimetype", None)
667 if mimetype is None:
668
669 ext = os.path.splitext(uri)[1]
670 if ext:
671
672 filename = os.path.basename(uri)
673 else:
674
675 if kw.get('file', None) is not None:
676 filename = os.path.basename(kw['file'])
677 else:
678
679 filename = uri
680
681
682 mimetype = guessMimetype(filename)
683
684
685 opts['Metadata.ContentType'] = mimetype
686
687 id = kw.pop("id", None)
688 if not id:
689 id = self._getUniqueId()
690 opts['Identifier'] = id
691
692 chkOnly = toBool(kw.get("chkonly", "false"))
693
694 opts['Verbosity'] = kw.get('Verbosity', 0)
695 opts['MaxRetries'] = kw.get("maxretries", -1)
696 opts['PriorityClass'] = kw.get("priority", 3)
697 opts['RealTimeFlag'] = toBool(kw.get("realtime", "false"))
698 opts['GetCHKOnly'] = chkOnly
699 opts['DontCompress'] = toBool(kw.get("nocompress", "false"))
700 opts['Codecs'] = kw.get('Codecs',
701 self.defaultCompressionCodecsString())
702
703 if kw.has_key("file"):
704 filepath = os.path.abspath(kw['file'])
705 opts['UploadFrom'] = "disk"
706 opts['Filename'] = filepath
707 if not kw.has_key("mimetype"):
708 opts['Metadata.ContentType'] = guessMimetype(kw['file'])
709
710 opts['FileHash'] = base64.encodestring(
711 sha256dda(self.connectionidentifier, id,
712 path=filepath))
713
714 elif kw.has_key("data"):
715 opts["UploadFrom"] = "direct"
716 opts["Data"] = kw['data']
717 targetFilename = kw.get('name')
718 if targetFilename:
719 opts["TargetFilename"] = targetFilename
720
721 elif kw.has_key("redirect"):
722 opts["UploadFrom"] = "redirect"
723 opts["TargetURI"] = kw['redirect']
724 elif chkOnly != "true":
725 raise Exception("Must specify file, data or redirect keywords")
726
727 if "TargetFilename" in kw:
728 opts["TargetFilename"] = kw["TargetFilename"]
729
730
731 opts['timeout'] = int(kw.get("timeout", ONE_YEAR))
732
733
734
735
736
737 return self._submitCmd(id, "ClientPut", **opts)
738
739
740
742 """
743 Inserts a freesite
744
745 Arguments:
746 - uri - uri under which to insert the key
747
748 Keywords:
749 - dir - the directory to insert - mandatory, no default.
750 This directory must contain a toplevel index.html file
751 - name - the name of the freesite, defaults to 'freesite'
752 - usk - set to True to insert as USK (Default false)
753 - version - the USK version number, default 0
754
755 - filebyfile - default False - if True, manually inserts
756 each constituent file, then performs the ClientPutComplexDir
757 as a manifest full of redirects. You *must* use this mode
758 if inserting from across a LAN
759
760 - maxretries - maximum number of retries, default 3
761 - priority - the PriorityClass for retrieval, default 2, may be between
762 0 (highest) to 6 (lowest)
763
764 - id - the job identifier, for persistent requests
765 - async - default False - if True, return immediately with a job ticket
766 - persistence - default 'connection' - the kind of persistence for
767 this request. If 'reboot' or 'forever', this job will be able to
768 be recalled in subsequent FCP sessions. Other valid values are
769 'reboot' and 'forever', as per FCP spec
770 - Global - default false - if evaluates to true, puts this request
771 on the global queue. Note the capital G in Global. If you set this,
772 persistence must be 'reboot' or 'forever'
773 - Verbosity - default 0 - sets the Verbosity mask passed in the
774 FCP message - case-sensitive
775 - allatonce - default False - if set, and if filebyfile is set, then
776 all files of the site will be inserted simultaneously, which can give
777 a nice speed-up for small to moderate sites, but cruel choking on
778 large sites; use with care
779 - globalqueue - perform the inserts on the global queue, which will
780 survive node reboots
781
782 - timeout - timeout for completion, in seconds, default one year
783
784
785 Returns:
786 - the URI under which the freesite can be retrieved
787 """
788 log = self._log
789 log(INFO, "putdir: uri=%s dir=%s" % (uri, kw['dir']))
790
791
792
793
794
795
796 chkonly = False
797
798
799
800 dir = kw['dir']
801 sitename = kw.get('name', 'freesite')
802 usk = kw.get('usk', False)
803 version = kw.get('version', 0)
804 maxretries = kw.get('maxretries', 3)
805 priority = kw.get('priority', 4)
806 Verbosity = kw.get('Verbosity', 0)
807
808 filebyfile = kw.get('filebyfile', False)
809
810
811
812
813 if kw.has_key('allatonce'):
814 allAtOnce = kw['allatonce']
815 filebyfile = True
816 else:
817 allAtOnce = False
818
819 if kw.has_key('maxconcurrent'):
820 maxConcurrent = kw['maxconcurrent']
821 filebyfile = True
822 allAtOnce = True
823 else:
824 maxConcurrent = 10
825
826 if kw.get('globalqueue', False):
827 globalMode = True
828 globalWord = "true"
829 persistence = "forever"
830 else:
831 globalMode = False
832 globalWord = "false"
833 persistence = "connection"
834
835 id = kw.pop("id", None)
836 if not id:
837 id = self._getUniqueId()
838
839 codecs = kw.get('Codecs',
840 self.defaultCompressionCodecsString())
841
842
843 uriFull = uri + sitename + "/"
844 if kw.get('usk', False):
845 uriFull += "%d/" % int(version)
846 uriFull = uriFull.replace("SSK@", "USK@")
847 while uriFull.endswith("/"):
848 uriFull = uriFull[:-1]
849
850 manifestDict = kw.get('manifest', None)
851
852
853
854
855
856
857
858
859 if manifestDict:
860
861
862
863 manifest = []
864 for relpath, attrDict in manifestDict.items():
865 if attrDict['changed'] or (relpath == "index.html"):
866 attrDict['relpath'] = relpath
867 attrDict['fullpath'] = os.path.join(dir, relpath)
868 manifest.append(attrDict)
869 else:
870
871
872 manifest = readdir(kw['dir'])
873 manifestDict = {}
874 for rec in manifest:
875 manifestDict[rec['relpath']] = rec
876
877
878
879
880
881
882
883 if 0:
884
885
886
887
888
889 log(INFO, "putdir: determining chks for all files")
890
891 for filerec in manifest:
892
893
894 relpath = filerec['relpath']
895 fullpath = filerec['fullpath']
896 mimetype = filerec['mimetype']
897
898
899 raw = file(fullpath, "rb").read()
900
901
902 uri = self.put("CHK@",
903 data=raw,
904 mimetype=mimetype,
905 Verbosity=Verbosity,
906 chkonly=True,
907 priority=priority,
908 )
909
910 if uri != filerec.get('uri', None):
911 filerec['changed'] = True
912 filerec['uri'] = uri
913
914 log(INFO, "%s -> %s" % (relpath, uri))
915
916
917
918
919
920
921 if filebyfile:
922
923
924
925
926 msgLines = ["ClientPutComplexDir",
927 "Identifier=%s" % id,
928 "Verbosity=%s" % Verbosity,
929 "MaxRetries=%s" % maxretries,
930 "PriorityClass=%s" % priority,
931 "URI=%s" % uriFull,
932 "Codecs=%s" % codecs,
933
934 "DefaultName=index.html",
935 ]
936
937 if globalMode:
938 msgLines.extend([
939 "Persistence=forever",
940 "Global=true",
941 ])
942 else:
943 msgLines.extend([
944 "Persistence=connection",
945 "Global=false",
946 ])
947
948
949 n = 0
950 default = None
951 for filerec in manifest:
952 relpath = filerec['relpath']
953 mimetype = filerec['mimetype']
954
955 log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
956
957 msgLines.extend(["Files.%d.Name=%s" % (n, relpath),
958 "Files.%d.UploadFrom=redirect" % n,
959 "Files.%d.TargetURI=%s" % (n, filerec['uri']),
960 ])
961 n += 1
962
963
964 msgLines.append("EndMessage")
965 manifestInsertCmdBuf = "\n".join(msgLines) + "\n"
966
967
968 for line in msgLines:
969 log(DETAIL, line)
970
971
972
973
974
975
976
977
978
979
980
981
982
983 jobs = []
984
985
986 if filebyfile:
987
988 log(INFO, "putdir: starting file-by-file inserts")
989
990 lastProgressMsgTime = time.time()
991
992
993 nTotal = len(manifest)
994
995
996 while True:
997
998 nQueued = len(jobs)
999 nComplete = len(
1000 filter(
1001 lambda j: j.isComplete(),
1002 jobs
1003 )
1004 )
1005 nWaiting = nTotal - nQueued
1006 nInserting = nQueued - nComplete
1007
1008
1009 now = time.time()
1010 if now - lastProgressMsgTime >= 10:
1011 lastProgressMsgTime = time.time()
1012 log(INFO,
1013 "putdir: waiting=%s inserting=%s done=%s total=%s" % (
1014 nWaiting, nInserting, nComplete, nTotal)
1015 )
1016
1017
1018 if nComplete == nTotal:
1019 log(INFO, "putdir: all inserts completed (or failed)")
1020 break
1021
1022
1023 if nInserting >= maxConcurrent:
1024 time.sleep(_pollInterval)
1025 continue
1026
1027
1028 if len(manifest) == 0:
1029 time.sleep(_pollInterval)
1030 continue
1031
1032
1033 filerec = manifest.pop(0)
1034 relpath = filerec['relpath']
1035 fullpath = filerec['fullpath']
1036 mimetype = filerec['mimetype']
1037
1038
1039
1040 log(INFO, "Launching insert of %s" % relpath)
1041
1042
1043
1044
1045 raw = file(fullpath, "rb").read()
1046
1047 print "globalMode=%s persistence=%s" % (globalMode, persistence)
1048
1049
1050 job = self.put("CHK@",
1051 data=raw,
1052 mimetype=mimetype,
1053 async=1,
1054 waituntilsent=1,
1055 Verbosity=Verbosity,
1056 chkonly=chkonly,
1057 priority=priority,
1058 Global=globalMode,
1059 Persistence=persistence,
1060 )
1061 jobs.append(job)
1062 filerec['job'] = job
1063 job.filerec = filerec
1064
1065
1066 if not allAtOnce:
1067 job.wait()
1068 log(INFO, "Insert finished for %s" % relpath)
1069
1070
1071 log(INFO, "All raw files now inserted (or failed)")
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081 msgLines = ["ClientPutComplexDir",
1082 "Identifier=%s" % id,
1083 "Verbosity=%s" % Verbosity,
1084 "MaxRetries=%s" % maxretries,
1085 "PriorityClass=%s" % priority,
1086 "URI=%s" % uriFull,
1087 "Codecs=%s" % codecs,
1088
1089 "DefaultName=index.html",
1090 ]
1091
1092 if kw.get('Global', False):
1093 msgLines.extend([
1094 "Persistence=forever",
1095 "Global=true",
1096 ])
1097 else:
1098 msgLines.extend([
1099 "Persistence=connection",
1100 "Global=false",
1101 ])
1102
1103
1104 n = 0
1105 default = None
1106 for job in jobs:
1107 filerec = job.filerec
1108 relpath = filerec['relpath']
1109 fullpath = filerec['fullpath']
1110 mimetype = filerec['mimetype']
1111
1112
1113 if filebyfile:
1114 if isinstance(filerec['job'].result, Exception):
1115 log(ERROR, "File %s failed to insert" % relpath)
1116 continue
1117
1118 log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
1119
1120 msgLines.extend(["Files.%d.Name=%s" % (n, relpath),
1121 ])
1122 if filebyfile:
1123
1124 uri = job.result
1125 if not uri:
1126 raise Exception("Can't find a URI for file %s" % filerec['relpath'])
1127
1128 msgLines.extend(["Files.%d.UploadFrom=redirect" % n,
1129 "Files.%d.TargetURI=%s" % (n, uri),
1130 ])
1131 else:
1132 msgLines.extend(["Files.%d.UploadFrom=disk" % n,
1133 "Files.%d.Filename=%s" % (n, fullpath),
1134 ])
1135 n += 1
1136
1137
1138 msgLines.append("EndMessage")
1139 manifestInsertCmdBuf = "\n".join(msgLines) + "\n"
1140
1141
1142 for line in msgLines:
1143 log(DETAIL, line)
1144
1145
1146
1147
1148
1149
1150
1151
1152 if chkonly:
1153 finalResult = "no_uri"
1154 else:
1155 finalResult = self._submitCmd(
1156 id, "ClientPutComplexDir",
1157 rawcmd=manifestInsertCmdBuf,
1158 async=kw.get('async', False),
1159 waituntilsent=kw.get('waituntilsent', False),
1160 callback=kw.get('callback', False),
1161
1162 )
1163
1164
1165
1166
1167
1168 return finalResult
1169
1171 """
1172 Modifies node configuration
1173
1174 Keywords:
1175 - async - whether to do this call asynchronously, and
1176 return a JobTicket object
1177 - callback - if given, this should be a callable which accepts 2
1178 arguments:
1179 - status - will be one of 'successful', 'failed' or 'pending'
1180 - value - depends on status:
1181 - if status is 'successful', this will contain the value
1182 returned from the command
1183 - if status is 'failed' or 'pending', this will contain
1184 a dict containing the response from node
1185 - keywords, which are the same as for the FCP message and documented in the wiki: http://wiki.freenetproject.org/FCP2p0ModifyConfig
1186 """
1187 return self._submitCmd("__global", "ModifyConfig", **kw)
1188
1189
1190
1192 """
1193 Gets node configuration
1194
1195 Keywords:
1196 - async - whether to do this call asynchronously, and
1197 return a JobTicket object
1198 - callback - if given, this should be a callable which accepts 2
1199 arguments:
1200 - status - will be one of 'successful', 'failed' or 'pending'
1201 - value - depends on status:
1202 - if status is 'successful', this will contain the value
1203 returned from the command
1204 - if status is 'failed' or 'pending', this will contain
1205 a dict containing the response from node
1206 - WithCurrent - default False - if True, the current configuration settings will be returned in the "current" tree of the ConfigData message fieldset
1207 - WithShortDescription - default False - if True, the configuration setting short descriptions will be returned in the "shortDescription" tree of the ConfigData message fieldset
1208 - other keywords, which are the same as for the FCP message and documented in the wiki: http://wiki.freenetproject.org/FCP2p0GetConfig
1209 """
1210
1211 return self._submitCmd("__global", "GetConfig", **kw)
1212
1213
1214
1216 """
1217 Converts an SSK or USK private key to a public equivalent
1218 """
1219 privatekey = privatekey.strip().split("freenet:")[-1]
1220
1221 isUsk = privatekey.startswith("USK@")
1222
1223 if isUsk:
1224 privatekey = privatekey.replace("USK@", "SSK@")
1225
1226 bits = privatekey.split("/", 1)
1227 mainUri = bits[0]
1228
1229 uri = self.put(mainUri+"/foo", data="bar", chkonly=1)
1230
1231 uri = uri.split("/")[0]
1232 uri = "/".join([uri] + bits[1:])
1233
1234 if isUsk:
1235 uri = uri.replace("SSK@", "USK@")
1236
1237 return uri
1238
1239
1240
1241 - def redirect(self, srcKey, destKey, **kw):
1242 """
1243 Inserts key srcKey, as a redirect to destKey.
1244 srcKey must be a KSK, or a path-less SSK or USK (and not a CHK)
1245 """
1246 uri = self.put(srcKey, redirect=destKey, **kw)
1247
1248 return uri
1249
1250
1251
1253 """
1254 Returns the CHK URI under which a data item would be
1255 inserted.
1256
1257 Keywords - you must specify one of the following:
1258 - file - path of file from which to read the key data
1259 - data - the raw data of the key as string
1260
1261 Keywords - optional:
1262 - mimetype - defaults to text/plain - THIS AFFECTS THE CHK!!
1263 """
1264 return self.put(chkonly=True, **kw)
1265
1266
1267
1269 """
1270 Gets the list of peers from the node
1271
1272 Keywords:
1273 - async - whether to do this call asynchronously, and
1274 return a JobTicket object
1275 - callback - if given, this should be a callable which accepts 2
1276 arguments:
1277 - status - will be one of 'successful', 'failed' or 'pending'
1278 - value - depends on status:
1279 - if status is 'successful', this will contain the value
1280 returned from the command
1281 - if status is 'failed' or 'pending', this will contain
1282 a dict containing the response from node
1283 - WithMetadata - default False - if True, returns a peer's metadata
1284 - WithVolatile - default False - if True, returns a peer's volatile info
1285 """
1286
1287 return self._submitCmd("__global", "ListPeers", **kw)
1288
1289
1290
1292 """
1293 Gets the list of peer notes for a given peer from the node
1294
1295 Keywords:
1296 - async - whether to do this call asynchronously, and
1297 return a JobTicket object
1298 - callback - if given, this should be a callable which accepts 2
1299 arguments:
1300 - status - will be one of 'successful', 'failed' or 'pending'
1301 - value - depends on status:
1302 - if status is 'successful', this will contain the value
1303 returned from the command
1304 - if status is 'failed' or 'pending', this will contain
1305 a dict containing the response from node
1306 - NodeIdentifier - one of name, identity or IP:port for the desired peer
1307 """
1308
1309 return self._submitCmd("__global", "ListPeerNotes", **kw)
1310
1311
1312
1314 """
1315 Gets node reference and possibly node statistics.
1316
1317 Keywords:
1318 - async - whether to do this call asynchronously, and
1319 return a JobTicket object
1320 - callback - if given, this should be a callable which accepts 2
1321 arguments:
1322 - status - will be one of 'successful', 'failed' or 'pending'
1323 - value - depends on status:
1324 - if status is 'successful', this will contain the value
1325 returned from the command
1326 - if status is 'failed' or 'pending', this will contain
1327 a dict containing the response from node
1328 - GiveOpennetRef - default False - if True, return the node's Opennet reference rather than the node's Darknet reference
1329 - WithPrivate - default False - if True, includes the node's private node reference fields
1330 - WithVolatile - default False - if True, returns a node's volatile info
1331 """
1332
1333 return self._submitCmd("__global", "GetNode", **kw)
1334
1335
1336
1338 """
1339 Test for Direct Disk Access capability on a directory (can the node and the FCP client both access the same directory?)
1340
1341 Keywords:
1342 - async - whether to do this call asynchronously, and
1343 return a JobTicket object
1344 - callback - if given, this should be a callable which accepts 2
1345 arguments:
1346 - status - will be one of 'successful', 'failed' or 'pending'
1347 - value - depends on status:
1348 - if status is 'successful', this will contain the value
1349 returned from the command
1350 - if status is 'failed' or 'pending', this will contain
1351 a dict containing the response from node
1352 - Directory - directory to test
1353 - WithReadDirectory - default False - if True, want node to read from directory for a put operation
1354 - WithWriteDirectory - default False - if True, want node to write to directory for a get operation
1355 """
1356
1357 DDAkey = (kw["Directory"], kw["WithReadDirectory"], kw["WithWriteDirectory"])
1358 try:
1359 return self.testedDDA[DDAkey]
1360 except KeyError:
1361 pass
1362 requestResult = self._submitCmd("__global", "TestDDARequest", **kw)
1363 writeFilename = None;
1364 kw = {};
1365 kw[ 'Directory' ] = requestResult[ 'Directory' ];
1366 if( requestResult.has_key( 'ReadFilename' )):
1367 readFilename = requestResult[ 'ReadFilename' ];
1368 readFile = open( readFilename, 'rb' );
1369 readFileContents = readFile.read();
1370 readFile.close();
1371 kw[ 'ReadFilename' ] = readFilename;
1372 kw[ 'ReadContent' ] = readFileContents;
1373
1374 if( requestResult.has_key( 'WriteFilename' ) and requestResult.has_key( 'ContentToWrite' )):
1375 writeFilename = requestResult[ 'WriteFilename' ];
1376 contentToWrite = requestResult[ 'ContentToWrite' ];
1377 writeFile = open( writeFilename, 'w+b' );
1378 writeFileContents = writeFile.write( contentToWrite );
1379 writeFile.close();
1380 writeFileStatObject = os.stat( writeFilename );
1381 writeFileMode = writeFileStatObject.st_mode;
1382 os.chmod( writeFilename, writeFileMode | stat.S_IREAD | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH );
1383
1384 responseResult = self._submitCmd("__global", "TestDDAResponse", **kw)
1385 if( None != writeFilename ):
1386 try:
1387 os.remove( writeFilename );
1388 except OSError, msg:
1389 pass;
1390
1391 self.testedDDA[DDAkey] = responseResult
1392 return responseResult;
1393
1394
1395
1397 """
1398 Add a peer to the node
1399
1400 Keywords:
1401 - async - whether to do this call asynchronously, and
1402 return a JobTicket object
1403 - callback - if given, this should be a callable which accepts 2
1404 arguments:
1405 - status - will be one of 'successful', 'failed' or 'pending'
1406 - value - depends on status:
1407 - if status is 'successful', this will contain the value
1408 returned from the command
1409 - if status is 'failed' or 'pending', this will contain
1410 a dict containing the response from node
1411 - File - filepath of a file containing a noderef in the node's directory
1412 - URL - URL of a copy of a peer's noderef to add
1413 - kwdict - If neither File nor URL are provided, the fields of a noderef can be passed in the form of a Python dictionary using the kwdict keyword
1414 """
1415
1416 return self._submitCmd("__global", "AddPeer", **kw)
1417
1418
1419
1421 """
1422 Modify settings on one of the node's peers
1423
1424 Keywords:
1425 - async - whether to do this call asynchronously, and
1426 return a JobTicket object
1427 - callback - if given, this should be a callable which accepts 2
1428 arguments:
1429 - status - will be one of 'successful', 'failed' or 'pending'
1430 - value - depends on status:
1431 - if status is 'successful', this will contain the value
1432 returned from the command
1433 - if status is 'failed' or 'pending', this will contain
1434 a dict containing the response from node
1435 - NodeIdentifier - one of name (except for opennet peers), identity or IP:port for the desired peer
1436 """
1437
1438 return self._submitCmd("__global", "ListPeer", **kw)
1439
1440
1441
1443 """
1444 Modify settings on one of the node's peers
1445
1446 Keywords:
1447 - async - whether to do this call asynchronously, and
1448 return a JobTicket object
1449 - callback - if given, this should be a callable which accepts 2
1450 arguments:
1451 - status - will be one of 'successful', 'failed' or 'pending'
1452 - value - depends on status:
1453 - if status is 'successful', this will contain the value
1454 returned from the command
1455 - if status is 'failed' or 'pending', this will contain
1456 a dict containing the response from node
1457 - IsDisabled - default False - enables or disabled the peer accordingly
1458 - IsListenOnly - default False - sets ListenOnly on the peer
1459 - NodeIdentifier - one of name, identity or IP:port for the desired peer
1460 """
1461
1462 return self._submitCmd("__global", "ModifyPeer", **kw)
1463
1464
1465
1467 """
1468 Modify settings on one of the node's peers
1469
1470 Keywords:
1471 - async - whether to do this call asynchronously, and
1472 return a JobTicket object
1473 - callback - if given, this should be a callable which accepts 2
1474 arguments:
1475 - status - will be one of 'successful', 'failed' or 'pending'
1476 - value - depends on status:
1477 - if status is 'successful', this will contain the value
1478 returned from the command
1479 - if status is 'failed' or 'pending', this will contain
1480 a dict containing the response from node
1481 - NodeIdentifier - one of name, identity or IP:port for the desired peer
1482 - NoteText - base64 encoded string of the desired peer note text
1483 - PeerNoteType - code number of peer note type: currently only private peer note is supported by the node with code number 1
1484 """
1485
1486 return self._submitCmd("__global", "ModifyPeerNote", **kw)
1487
1488
1489
1491 """
1492 Removes a peer from the node
1493
1494 Keywords:
1495 - async - whether to do this call asynchronously, and
1496 return a JobTicket object
1497 - callback - if given, this should be a callable which accepts 2
1498 arguments:
1499 - status - will be one of 'successful', 'failed' or 'pending'
1500 - value - depends on status:
1501 - if status is 'successful', this will contain the value
1502 returned from the command
1503 - if status is 'failed' or 'pending', this will contain
1504 a dict containing the response from node
1505 - NodeIdentifier - one of name, identity or IP:port for the desired peer
1506 """
1507
1508 return self._submitCmd("__global", "RemovePeer", **kw)
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1520 """
1521 Initialise the namesites layer and load our namesites list
1522 """
1523 if path:
1524 self.namesiteFile = path
1525 else:
1526 self.namesiteFile = os.path.join(os.path.expanduser("~"), ".freenames")
1527
1528 self.namesiteLocals = []
1529 self.namesitePeers = []
1530
1531
1532 if os.path.isfile(self.namesiteFile):
1533 self.namesiteLoad()
1534 else:
1535 self.namesiteSave()
1536
1537
1538
1540 """
1541 """
1542 try:
1543 parser = pseudopythonparser.Parser()
1544 env = parser.parse(file(self.namesiteFile).read())
1545 self.namesiteLocals = env['locals']
1546 self.namesitePeers = env['peers']
1547 except:
1548 traceback.print_exc()
1549 env = {}
1550
1551
1552
1554 """
1555 Save the namesites list
1556 """
1557 f = file(self.namesiteFile, "w")
1558
1559 f.write("# pyFreenet namesites registration file\n\n")
1560
1561 pp = pprint.PrettyPrinter(width=72, indent=2, stream=f)
1562
1563 f.write("locals = ")
1564 pp.pprint(self.namesiteLocals)
1565 f.write("\n")
1566
1567 f.write("peers = ")
1568 pp.pprint(self.namesitePeers)
1569 f.write("\n")
1570
1571 f.close()
1572
1573
1574
1576 """
1577 Create a new nameservice that we own
1578 """
1579 if not privuri:
1580 privuri = self.genkey()[1]
1581 puburi = self.invertprivate(privuri)
1582
1583 privuri = self.namesiteProcessUri(privuri)
1584 puburi = self.namesiteProcessUri(puburi)
1585
1586 for rec in self.namesiteLocals:
1587 if rec['name'] == name:
1588 raise Exception("Already got a local service called '%s'" % name)
1589
1590 self.namesiteLocals.append(
1591 {'name':name,
1592 'privuri':privuri,
1593 'puburi': puburi,
1594 'cache': {},
1595 })
1596
1597 self.namesiteSave()
1598
1599
1600
1602 """
1603 Delete a local nameservice
1604 """
1605 rec = None
1606 for r in self.namesiteLocals:
1607 if r['name'] == name:
1608 self.namesiteLocals.remove(r)
1609
1610 self.namesiteSave()
1611
1612
1613
1615 """
1616 Adds a (domainname -> uri) record to one of our local
1617 services
1618 """
1619 rec = None
1620 for r in self.namesiteLocals:
1621 if r['name'] == localname:
1622 rec = r
1623 if not rec:
1624 raise Exception("No local service '%s'" % localname)
1625
1626 cache = rec['cache']
1627
1628
1629 if cache.get(domain, None) == uri:
1630 return
1631
1632
1633 cache[domain] = uri
1634
1635
1636 self.namesiteSave()
1637
1638
1639 localPrivUri = rec['privuri'] + "/" + domain + "/0"
1640
1641
1642 id = "namesite|%s|%s|%s" % (localname, domain, int(time.time()))
1643 self.put(
1644 localPrivUri,
1645 id=id,
1646 data=uri,
1647 persistence="forever",
1648 Global=True,
1649 priority=0,
1650 async=True,
1651 )
1652
1653 self.refreshPersistentRequests()
1654
1655
1656
1658 """
1659 Removes a domainname record from one of our local
1660 services
1661 """
1662 rec = None
1663 for r in self.namesiteLocals:
1664 if r['name'] == localname:
1665 if domain in r['cache']:
1666 del r['cache'][domain]
1667
1668 self.namesiteSave()
1669
1670
1671
1673 """
1674 Adds a namesite to our list
1675 """
1676
1677 uri = uri.split("freenet:")[-1]
1678
1679
1680 if not uri.startswith("USK"):
1681 raise Exception("Invalid URI %s, should be a public USK" % uri)
1682
1683
1684 uri = uri.split("freenet:")[-1]
1685 uri = uri.split("/")[0]
1686
1687 if self.namesiteHasPeer(name):
1688 raise Exception("Peer nameservice '%s' already exists" % name)
1689
1690 self.namesitePeers.append({'name':name, 'puburi':uri})
1691
1692 self.namesiteSave()
1693
1694
1695
1697 """
1698 returns True if we have a peer namesite of given name
1699 """
1700 return self.namesiteGetPeer(name) is not None
1701
1702
1703
1705 """
1706 returns record for given peer
1707 """
1708 for rec in self.namesitePeers:
1709 if rec['name'] == name:
1710 return rec
1711 return None
1712
1713
1714
1716 """
1717 Removes a namesite from our list
1718 """
1719 for rec in self.namesitePeers:
1720 if rec['name'] == name:
1721 self.namesitePeers.remove(rec)
1722
1723 self.namesiteSave()
1724
1725
1726
1728 """
1729 Attempts a lookup of a given 'domain name' on our designated
1730 namesites
1731
1732 Arguments:
1733 - domain - the domain to look up
1734
1735 Keywords:
1736 - localonly - whether to only search local cache
1737 - peer - if given, search only that peer's namesite (not locals)
1738 """
1739 self.namesiteLoad()
1740
1741 localonly = kw.get('localonly', False)
1742 peer = kw.get('peer', None)
1743
1744 if not peer:
1745
1746 for rec in self.namesiteLocals:
1747 if domain in rec['cache']:
1748 return rec['cache'][domain]
1749
1750 if localonly:
1751 return None
1752
1753
1754 for rec in self.namesitePeers:
1755
1756 if peer and (peer != rec['name']):
1757 continue
1758
1759 uri = rec['puburi'] + "/" + domain + "/0"
1760
1761 try:
1762 mimetype, tgtUri = self.get(uri)
1763 return tgtUri
1764 except:
1765 pass
1766
1767 return None
1768
1769
1770
1772 """
1773 Reduces a URI
1774 """
1775
1776 uri1 = uri.split("freenet:")[-1]
1777
1778
1779 uri1 = uri1.replace("SSK@", "USK@").split("/")[0]
1780
1781
1782 if not uri1.startswith("USK@"):
1783 usage("Bad uri %s" % uri)
1784
1785 return uri1
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1797 """
1798 Enable listening on global queue
1799 """
1800 self._submitCmd(None, "WatchGlobal", Enabled="true", **kw)
1801
1802
1803
1805 """
1806 Stop listening on global queue
1807 """
1808 self._submitCmd(None, "WatchGlobal", Enabled="false", **kw)
1809
1810
1811
1813 """
1814 Cancels all persistent jobs in one go
1815 """
1816 for job in self.getPersistentJobs():
1817 job.cancel()
1818
1819
1820
1822 """
1823 Returns a list of persistent jobs, excluding global jobs
1824 """
1825 return self.jobs.values()
1826
1827
1828
1830 """
1831 Returns a list of persistent jobs, excluding global jobs
1832 """
1833 return [j for j in self.jobs.values() if j.isPersistent and not j.isGlobal]
1834
1835
1836
1838 """
1839 Returns a list of global jobs
1840 """
1841 return [j for j in self.jobs.values() if j.isGlobal]
1842
1843
1844
1846 """
1847 Returns a list of non-persistent, non-global jobs
1848 """
1849 return [j for j in self.jobs.values() if not j.isPersistent]
1850
1851
1852
1854 """
1855 Sends a ListPersistentRequests to node, to ensure that
1856 our records of persistent requests are up to date.
1857
1858 Since, upon connection, the node sends us a list of all
1859 outstanding persistent requests anyway, I can't really
1860 see much use for this method. I've only added the method
1861 for FCP spec compliance
1862 """
1863 self._log(DETAIL, "listPersistentRequests")
1864
1865 if self.jobs.has_key('__global'):
1866 raise Exception("An existing non-identifier job is currently pending")
1867
1868
1869
1870 opts = {}
1871
1872 id = '__global'
1873 opts['Identifier'] = id
1874
1875 opts['async'] = kw.pop('async', False)
1876 if kw.has_key('callback'):
1877 opts['callback'] = kw['callback']
1878
1879
1880
1881 return self._submitCmd(id, "ListPersistentRequests", **opts)
1882
1883
1884
1886 """
1887 Removes a job from the jobs queue
1888 """
1889 self._submitCmd(id, "RemovePersistentRequest",
1890 Identifier=id, Global=True, async=True, waituntilsent=True)
1891
1892
1893
1895 """
1896 Gets the socketTimeout for future socket calls;
1897 returns None if not supported by Python version
1898 """
1899 try:
1900 return self.socket.gettimeout()
1901 except Exception, e:
1902
1903 pass
1904 return None
1905
1906
1907
1909 """
1910 Sets the socketTimeout for future socket calls
1911
1912 >>> node = FCPNode()
1913 >>> timeout = node.getSocketTimeout()
1914 >>> newtimeout = 1800
1915 >>> node.setSocketTimeout(newtimeout)
1916 >>> node.getSocketTimeout()
1917 1800.0
1918 """
1919 self.socketTimeout = socketTimeout
1920 try:
1921 self.socket.settimeout(self.socketTimeout)
1922 except Exception, e:
1923
1924 pass
1925
1926
1927
1929 """
1930 Gets the verbosity for future logging calls
1931
1932 >>> node = FCPNode()
1933 >>> node.getVerbosity() # default
1934 3
1935 >>> node.setVerbosity(INFO)
1936 >>> node.getVerbosity()
1937 4
1938 """
1939 return self.verbosity
1940
1941
1942
1944 """
1945 Sets the verbosity for future logging calls
1946 """
1947 self.verbosity = verbosity
1948
1949
1950
1952 """
1953 Terminates the manager thread
1954
1955 You should explicitly shutdown any existing nodes
1956 before exiting your client application
1957 """
1958 log = self._log
1959
1960 log(DETAIL, "shutdown: entered")
1961 if not self.running:
1962 log(DETAIL, "shutdown: already shut down")
1963 return
1964
1965 self.running = False
1966
1967
1968 time.sleep(pollTimeout * 3)
1969
1970
1971 log(DETAIL, "shutdown: waiting for manager thread to terminate")
1972 self.shutdownLock.acquire()
1973 log(DETAIL, "shutdown: manager thread terminated")
1974
1975
1976 if hasattr(self, 'socket'):
1977 if not self.noCloseSocket:
1978 self.socket.close()
1979 del self.socket
1980
1981
1982 if None != self.logfile and self.logfile not in [sys.stdout, sys.stderr]:
1983 self.logfile.close()
1984
1985 log(DETAIL, "shutdown: done?")
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1999 """
2000 This thread is the nucleus of pyFreenet, and coordinates incoming
2001 client commands and incoming node responses
2002 """
2003 log = self._log
2004
2005 self.shutdownLock.acquire()
2006
2007 log(DETAIL, "FCPNode: manager thread starting")
2008 try:
2009 while self.running:
2010
2011 log(NOISY, "_mgrThread: Top of manager thread")
2012
2013
2014 log(NOISY, "_mgrThread: Testing for incoming message")
2015 if self._msgIncoming():
2016 log(DEBUG, "_mgrThread: Retrieving incoming message")
2017 msg = self._rxMsg()
2018 log(DEBUG, "_mgrThread: Got incoming message, dispatching")
2019 self._on_rxMsg(msg)
2020 log(DEBUG, "_mgrThread: back from on_rxMsg")
2021 else:
2022 log(NOISY, "_mgrThread: No incoming message from node")
2023
2024
2025 log(NOISY, "_mgrThread: Testing for client req")
2026 try:
2027 req = self.clientReqQueue.get(True, pollTimeout)
2028 log(DEBUG, "_mgrThread: Got client req, dispatching")
2029 self._on_clientReq(req)
2030 log(DEBUG, "_mgrThread: Back from on_clientReq")
2031 except Queue.Empty:
2032 log(NOISY, "_mgrThread: No incoming client req")
2033 pass
2034
2035 self._log(DETAIL, "_mgrThread: Manager thread terminated normally")
2036
2037 except Exception, e:
2038 traceback.print_exc()
2039 self._log(CRITICAL, "_mgrThread: manager thread crashed")
2040
2041
2042 for id, job in self.jobs.items():
2043 job._putResult(e)
2044
2045
2046 while True:
2047 try:
2048 job = self.clientReqQueue.get(True, pollTimeout)
2049 job._putResult(e)
2050 except Queue.Empty:
2051 log(NOISY, "_mgrThread: No incoming client req")
2052 break
2053
2054 self.shutdownLock.release()
2055
2056
2057
2059 """
2060 Returns True if a message is coming in from the node
2061 """
2062 return len(select.select([self.socket], [], [], pollTimeout)[0]) > 0
2063
2064
2065
2067 """
2068 Submits a command for execution
2069
2070 Arguments:
2071 - id - the command identifier
2072 - cmd - the command name, such as 'ClientPut'
2073
2074 Keywords:
2075 - async - whether to return a JobTicket object, rather than
2076 the command result
2077 - callback - a function taking 2 args 'status' and 'value'.
2078 Status is one of 'successful', 'pending' or 'failed'.
2079 value is the primitive return value if successful, or the raw
2080 node message if pending or failed
2081 - followRedirect - follow a redirect if true, otherwise fail the get
2082 - rawcmd - a raw command buffer to send directly
2083 - options specific to command such as 'URI'
2084 - timeout - timeout in seconds for job completion, default 1 year
2085 - waituntilsent - whether to block until this command has been sent
2086 to the node, default False
2087 - keep - whether to keep the job on our jobs list after it completes,
2088 default False
2089
2090 Returns:
2091 - if command is sent in sync mode, returns the result
2092 - if command is sent in async mode, returns a JobTicket
2093 object which the client can poll or block on later
2094
2095 >>> import fcp
2096 >>> n = fcp.node.FCPNode()
2097 >>> cmd = "ClientPut"
2098 >>> jobid = "id2291160822224650"
2099 >>> opts = {'Metadata.ContentType': 'text/html', 'async': False, 'UploadFrom': 'direct', 'Verbosity': 0, 'Global': 'false', 'URI': 'CHK@', 'keep': False, 'DontCompress': 'false', 'MaxRetries': -1, 'timeout': 31536000, 'Codecs': 'GZIP, BZIP2, LZMA, LZMA_NEW', 'GetCHKOnly': 'true', 'RealTimeFlag': 'false', 'waituntilsent': False, 'Identifier': jobid, 'Data': '<!DOCTYPE html>\\n<html>\\n<head>\\n<title>Sitemap for freenet-plugin-bare</title>\\n</head>\\n<body>\\n<h1>Sitemap for freenet-plugin-bare</h1>\\nThis listing was automatically generated and inserted by freesitemgr\\n<br><br>\\n<table cellspacing=0 cellpadding=2 border=0>\\n<tr>\\n<td><b>Size</b></td>\\n<td><b>Mimetype</b></td>\\n<td><b>Name</b></td>\\n</tr>\\n<tr>\\n<td>19211</td>\\n<td>text/html</td>\\n<td><a href="index.html">index.html</a></td>\\n</tr>\\n</table>\\n<h2>Keys of large, separately inserted files</h2>\\n<pre>\\n</pre></body></html>\\n', 'PriorityClass': 3, 'Persistence': 'connection', 'TargetFilename': 'sitemap.html'}
2100 >>> n._submitCmd(jobid, cmd, **opts)
2101 'CHK@FR~anQPhpw7lZjxl96o1b875tem~5xExPTiSa6K3Wus,yuGOWhpqFY5N9i~N4BjM0Oh6Bk~Kkb7sE4l8GAsdBEs,AAMC--8/sitemap.html'
2102 >>> # n._submitCmd(id=None, cmd='WatchGlobal', **{'Enabled': 'true'})
2103
2104 """
2105 if not self.nodeIsAlive:
2106 raise FCPNodeFailure("%s:%s: node closed connection" % (cmd, id))
2107
2108 log = self._log
2109
2110 log(DEBUG, "_submitCmd: id=" + repr(id) + ", cmd=" + repr(cmd) + ", **" + repr(kw))
2111
2112 async = kw.pop('async', False)
2113 followRedirect = kw.pop('followRedirect', True)
2114 stream = kw.pop('stream', None)
2115 waituntilsent = kw.pop('waituntilsent', False)
2116 keepjob = kw.pop('keep', False)
2117 timeout = kw.pop('timeout', ONE_YEAR)
2118 if( kw.has_key( "kwdict" )):
2119 kwdict = kw[ "kwdict" ]
2120 del kw[ "kwdict" ]
2121 for key in kwdict.keys():
2122 kw[ key ] = kwdict[ key ]
2123 job = JobTicket(
2124 self, id, cmd, kw,
2125 verbosity=self.verbosity, logger=self._log, keep=keepjob,
2126 stream=stream)
2127
2128 log(DEBUG, "_submitCmd: timeout=%s" % timeout)
2129
2130 job.followRedirect = followRedirect
2131
2132 if cmd == 'ClientGet' and 'URI' in kw:
2133 job.uri = kw['URI']
2134
2135 if cmd == 'ClientPut':
2136 job.mimetype = kw['Metadata.ContentType']
2137
2138 self.clientReqQueue.put(job)
2139
2140
2141
2142
2143
2144
2145
2146 if async:
2147 if waituntilsent:
2148 job.waitTillReqSent()
2149 return job
2150 elif cmd in ['WatchGlobal', "RemovePersistentRequest"]:
2151 return
2152 else:
2153 log(DETAIL, "Waiting on job")
2154 return job.wait(timeout)
2155
2156
2157
2159 """
2160 takes an incoming request job from client and transmits it to
2161 the fcp port, and also registers it so the manager thread
2162 can action responses from the fcp port.
2163 """
2164 id = job.id
2165 cmd = job.cmd
2166 kw = job.kw
2167
2168
2169 if cmd != 'WatchGlobal':
2170 self.jobs[id] = job
2171 self._log(DEBUG, "_on_clientReq: cmd=%s id=%s lock=%s" % (
2172 cmd, repr(id), job.lock))
2173
2174
2175 self._txMsg(cmd, **kw)
2176
2177 job.timeQueued = int(time.time())
2178
2179 job.reqSentLock.release()
2180
2181
2182
2184 """
2185 Handles incoming messages from node
2186
2187 If an incoming message represents the termination of a command,
2188 the job ticket object will be notified accordingly
2189 """
2190 log = self._log
2191
2192
2193 id = msg.get('Identifier', '__global')
2194
2195 hdr = msg['header']
2196
2197 job = self.jobs.get(id, None)
2198 if not job:
2199
2200 log(DETAIL, "***** Got %s from unknown job id %s" % (hdr, repr(id)))
2201 job = JobTicket(self, id, hdr, msg)
2202 self.jobs[id] = job
2203
2204
2205
2206
2207
2208
2209 if hdr == 'SSKKeypair':
2210
2211 keys = (msg['RequestURI'], msg['InsertURI'])
2212 job.callback('successful', keys)
2213 job._putResult(keys)
2214
2215
2216 self.jobs.pop(id, None)
2217 return
2218
2219
2220
2221
2222 if hdr == 'DataFound':
2223 if( job.kw.has_key( 'URI' )):
2224 log(INFO, "Got DataFound for URI=%s" % job.kw['URI'])
2225 else:
2226 log(ERROR, "Got DataFound without URI")
2227 mimetype = msg['Metadata.ContentType']
2228 if job.kw.has_key('Filename'):
2229
2230
2231 result = (mimetype, job.kw['Filename'], msg)
2232 job.callback('successful', result)
2233 job._putResult(result)
2234 return
2235
2236 elif job.kw['ReturnType'] == 'none':
2237 result = (mimetype, 1, msg)
2238 job.callback('successful', result)
2239 job._putResult(result)
2240 return
2241
2242
2243 else:
2244
2245 if job.kw['ReturnType'] == 'direct' \
2246 and job.kw.get('Persistence', None) != 'connection':
2247
2248
2249 log(INFO, "Request was persistent")
2250 if not hasattr(job, "gotPersistentDataFound"):
2251 if job.isGlobal:
2252 isGlobal = "true"
2253 else:
2254 isGlobal = "false"
2255 job.gotPersistentDataFound = True
2256 log(INFO, " --> sending GetRequestStatus")
2257 self._txMsg("GetRequestStatus",
2258 Identifier=job.kw['Identifier'],
2259 Persistence=msg.get("Persistence", "connection"),
2260 Global=isGlobal,
2261 )
2262
2263 job.callback('pending', msg)
2264 job.mimetype = mimetype
2265 return
2266
2267 if hdr == 'CompatibilityMode':
2268
2269
2270 job.callback('pending', msg)
2271 return
2272
2273 if hdr == 'ExpectedMIME':
2274
2275
2276 mimetype = msg['Metadata.ContentType']
2277 job.mimetype = mimetype
2278 job.callback('pending', msg)
2279 return
2280
2281 if hdr == 'ExpectedDataLength':
2282
2283
2284 size = msg['DataLength']
2285 job.callback('pending', msg)
2286 return
2287
2288 if hdr == 'AllData':
2289 result = (job.mimetype, msg['Data'], msg)
2290 job.callback('successful', result)
2291 job._putResult(result)
2292 return
2293
2294 if hdr == 'GetFailed':
2295
2296 if job.followRedirect and msg.get('ShortCodeDescription', None) == "New URI":
2297 uri = msg['RedirectURI']
2298 job.kw['URI'] = uri
2299 job.kw['id'] = self._getUniqueId();
2300 self._txMsg(job.cmd, **job.kw)
2301 log(DETAIL, "Redirect to %s" % uri)
2302 return
2303
2304 if job.followRedirect and msg.get('ShortCodeDescription', None) == "Too many path components":
2305 uri = msg['RedirectURI']
2306 job.kw['URI'] = uri
2307 job.kw['id'] = self._getUniqueId();
2308 self._txMsg(job.cmd, **job.kw)
2309 log(DETAIL, "Redirect to %s" % uri)
2310 return
2311
2312
2313 job.callback("failed", msg)
2314 job._putResult(FCPGetFailed(msg))
2315 return
2316
2317
2318
2319
2320 if hdr == 'URIGenerated':
2321 if 'URI' not in msg:
2322 log(ERROR, "message {} without 'URI'. This is very likely a bug in Freenet. Check whether you have files in uploads or downloads without URI (clickable link).".format(hdr))
2323 else:
2324 job.uri = msg['URI']
2325 newUri = msg['URI']
2326 job.callback('pending', msg)
2327
2328 return
2329
2330
2331 if job.kw.get('GetCHKOnly', False) == 'true':
2332
2333 job._putResult(newUri)
2334 return
2335
2336 if hdr == 'PutSuccessful':
2337 if 'URI' not in msg:
2338 log(ERROR, "message {} without 'URI'. This is very likely a bug in Freenet. Check whether you have files in uploads or downloads without URI (clickable link).".format(hdr))
2339 else:
2340 result = msg['URI']
2341 job._putResult(result)
2342 job.callback('successful', result)
2343
2344 return
2345
2346 if hdr == 'PutFailed':
2347 job.callback('failed', msg)
2348 job._putResult(FCPPutFailed(msg))
2349 return
2350
2351 if hdr == 'PutFetchable':
2352 if 'URI' not in msg:
2353 log(ERROR, "message {} without 'URI'. This is very likely a bug in Freenet. Check whether you have files in uploads or downloads without URI (clickable link).".format(hdr))
2354 else:
2355 uri = msg['URI']
2356 job.kw['URI'] = uri
2357 job.callback('pending', msg)
2358 return
2359
2360
2361
2362 if hdr == 'ConfigData':
2363
2364 job.callback('successful', msg)
2365 job._putResult(msg)
2366
2367
2368 self.jobs.pop(id, None)
2369 return
2370
2371
2372
2373
2374 if hdr == 'StartedCompression':
2375 job.callback('pending', msg)
2376 return
2377
2378 if hdr == 'FinishedCompression':
2379 job.callback('pending', msg)
2380 return
2381
2382 if hdr == 'SimpleProgress':
2383 job.callback('pending', msg)
2384 return
2385
2386 if hdr == 'SendingToNetwork':
2387 job.callback('pending', msg)
2388 return
2389
2390 if hdr == 'ExpectedHashes':
2391
2392
2393 sha256 = msg['Hashes.SHA256']
2394 job.callback('pending', msg)
2395 return
2396
2397
2398
2399
2400
2401 if hdr == 'FCPPluginReply':
2402 job._appendMsg(msg)
2403 job.callback('successful', job.msgs)
2404 job._putResult(job.msgs)
2405 return
2406
2407
2408
2409
2410 if hdr == 'EndListPeers':
2411 job._appendMsg(msg)
2412 job.callback('successful', job.msgs)
2413 job._putResult(job.msgs)
2414 return
2415
2416 if hdr == 'Peer':
2417 if(job.cmd == "ListPeers"):
2418 job.callback('pending', msg)
2419 job._appendMsg(msg)
2420 else:
2421 job.callback('successful', msg)
2422 job._putResult(msg)
2423 return
2424
2425 if hdr == 'PeerRemoved':
2426 job._appendMsg(msg)
2427 job.callback('successful', job.msgs)
2428 job._putResult(job.msgs)
2429 return
2430
2431 if hdr == 'UnknownNodeIdentifier':
2432 job._appendMsg(msg)
2433 job.callback('failed', job.msgs)
2434 job._putResult(job.msgs)
2435 return
2436
2437
2438
2439
2440 if hdr == 'EndListPeerNotes':
2441 job._appendMsg(msg)
2442 job.callback('successful', job.msgs)
2443 job._putResult(job.msgs)
2444
2445
2446
2447
2448 return
2449
2450 if hdr == 'PeerNote':
2451 if(job.cmd == "ListPeerNotes"):
2452 job.callback('pending', msg)
2453 job._appendMsg(msg)
2454 else:
2455 job.callback('successful', msg)
2456 job._putResult(msg)
2457 return
2458
2459 if hdr == 'UnknownPeerNoteType':
2460 job._appendMsg(msg)
2461 job.callback('failed', job.msgs)
2462 job._putResult(job.msgs)
2463 return
2464
2465
2466
2467
2468 if hdr == 'PersistentGet':
2469 job.callback('pending', msg)
2470 job._appendMsg(msg)
2471 return
2472
2473 if hdr == 'PersistentPut':
2474 job.callback('pending', msg)
2475 job._appendMsg(msg)
2476 return
2477
2478 if hdr == 'PersistentPutDir':
2479 job.callback('pending', msg)
2480 job._appendMsg(msg)
2481 return
2482
2483 if hdr == 'EndListPersistentRequests':
2484 job._appendMsg(msg)
2485 job.callback('successful', job.msgs)
2486 job._putResult(job.msgs)
2487 return
2488
2489 if hdr == 'PersistentRequestRemoved':
2490 if self.jobs.has_key(id):
2491 del self.jobs[id]
2492 return
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503 if hdr == 'SubscribedUSK':
2504 job.callback('successful', msg)
2505 return
2506
2507 if hdr == 'SubscribedUSKUpdate':
2508 job.callback('successful', msg)
2509 return
2510
2511 if hdr == 'SubscribedUSKRoundFinished':
2512 job.callback('successful', msg)
2513 return
2514
2515 if hdr == 'SubscribedUSKSendingToNetwork':
2516 job.callback('successful', msg)
2517 return
2518
2519
2520
2521
2522 if hdr == 'TestDDAReply':
2523
2524 job.callback('successful', msg)
2525 job._putResult(msg)
2526
2527
2528 self.jobs.pop(id, None)
2529 return
2530
2531 if hdr == 'TestDDAComplete':
2532
2533 job.callback('successful', msg)
2534 job._putResult(msg)
2535
2536
2537 self.jobs.pop(id, None)
2538 return
2539
2540
2541
2542 if hdr == 'NodeData':
2543
2544 job.callback('successful', msg)
2545 job._putResult(msg)
2546
2547
2548 self.jobs.pop(id, None)
2549 return
2550
2551
2552
2553
2554 if hdr == 'ProtocolError':
2555 job.callback('failed', msg)
2556 job._putResult(FCPProtocolError(msg))
2557 return
2558
2559 if hdr == 'IdentifierCollision':
2560 log(ERROR, "IdentifierCollision on id %s ???" % id)
2561 job.callback('failed', msg)
2562 job._putResult(Exception("Duplicate job identifier %s" % id))
2563 return
2564
2565
2566 if hdr == 'ExpectedHashes' or hdr == 'CompatibilityMode':
2567 return
2568
2569
2570
2571
2572 log(ERROR, "Unknown message type from node: %s" % hdr)
2573 job.callback('failed', msg)
2574 job._putResult(FCPException(msg))
2575 return
2576
2577
2578
2579
2580
2581
2582
2583
2584
2586 """
2587 perform the initial FCP protocol handshake
2588 """
2589 self._txMsg("ClientHello",
2590 Name=self.name,
2591 ExpectedVersion=expectedVersion)
2592
2593 resp = self._rxMsg()
2594 if(resp.has_key("Version")):
2595 self.nodeVersion = resp[ "Version" ];
2596 if(resp.has_key("FCPVersion")):
2597 self.nodeFCPVersion = resp[ "FCPVersion" ];
2598 if(resp.has_key("Build")):
2599 try:
2600 self.nodeBuild = int( resp[ "Build" ] );
2601 except Exception, msg:
2602 pass;
2603 else:
2604 nodeVersionFields = self.nodeVersion.split( "," );
2605 if( len( nodeVersionFields ) == 4 ):
2606 try:
2607 self.nodeBuild = int( nodeVersionFields[ 3 ] );
2608 except Exception, msg:
2609 pass;
2610 if(resp.has_key("Revision")):
2611 try:
2612 self.nodeRevision = int( resp[ "Revision" ] );
2613 except Exception, msg:
2614 pass;
2615 if(resp.has_key("ExtBuild")):
2616 try:
2617 self.nodeExtBuild = int( resp[ "ExtBuild" ] );
2618 except Exception, msg:
2619 pass;
2620 if(resp.has_key("Revision")):
2621 try:
2622 self.nodeExtRevision = int( resp[ "ExtRevision" ] );
2623 except Exception, msg:
2624 pass;
2625 if(resp.has_key("Testnet")):
2626 if( "true" == resp[ "Testnet" ] ):
2627 self.nodeIsTestnet = True;
2628 else:
2629 self.nodeIsTestnet = False;
2630 if(resp.has_key("ConnectionIdentifier")):
2631 self.connectionidentifier = resp[ "ConnectionIdentifier" ]
2632 try:
2633 self.compressionCodecs = self._parseCompressionCodecs(
2634 resp [ "CompressionCodecs" ])
2635 except (KeyError, IndexError, ValueError):
2636 pass
2637
2638
2639 return resp
2640
2641
2642
2644 """
2645 Turn the CompressionCodecsString returned by the node into a list
2646 of name and number of the codec.
2647
2648 @param CompressionCodecsString: "3 - GZIP(0), BZIP2(1), LZMA(2)"
2649 @return: [(name, number), ...]
2650
2651 """
2652 return [(name, int(number[:-1]))
2653 for name, number
2654 in [i.split("(")
2655 for i in CompressionCodecsString.split(
2656 " - ")[1].split(", ")]]
2657
2658
2660 """
2661 Turn the CompressionCodecs into a string accepted by the node.
2662
2663 @param CompressionCodecs: [(name, number), ...]
2664 @return: "GZIP, BZIP2, LZMA" (example)
2665
2666 """
2667 return ", ".join([name for name, num in self.compressionCodecs])
2668
2669
2671 """
2672 Allocate a unique ID for a request
2673 """
2674 timenum = int( time.time() * 1000000 );
2675 randnum = random.randint( 0, timenum );
2676 return "id" + str( timenum + randnum );
2677
2678
2679
2680 - def _txMsg(self, msgType, **kw):
2681 """
2682 low level message send
2683
2684 Arguments:
2685 - msgType - one of the FCP message headers, such as 'ClientHello'
2686 - args - zero or more (keyword, value) tuples
2687 Keywords:
2688 - rawcmd - if given, this is the raw buffer to send
2689 - other keywords depend on the value of msgType
2690 """
2691 log = self._log
2692
2693
2694 rawcmd = kw.get('rawcmd', None)
2695 if rawcmd:
2696 self.socket.sendall(rawcmd)
2697 log(DETAIL, "CLIENT: %s" % rawcmd)
2698 return
2699
2700 if kw.has_key("Data"):
2701 data = kw.pop("Data")
2702 sendEndMessage = False
2703 else:
2704 data = None
2705 sendEndMessage = True
2706
2707 items = [msgType + "\n"]
2708 log(DETAIL, "CLIENT: %s" % msgType)
2709
2710
2711 for k, v in kw.items():
2712
2713 line = k + "=" + str(v)
2714 items.append(line + "\n")
2715 log(DETAIL, "CLIENT: %s" % line)
2716
2717 if data != None:
2718 items.append("DataLength=%d\n" % len(data))
2719 log(DETAIL, "CLIENT: DataLength=%d" % len(data))
2720 items.append("Data\n")
2721 log(DETAIL, "CLIENT: ...data...")
2722 items.append(data)
2723
2724
2725
2726 if sendEndMessage:
2727 items.append("EndMessage\n")
2728 log(DETAIL, "CLIENT: EndMessage")
2729 raw = "".join(items)
2730
2731 self.socket.sendall(raw)
2732
2733
2734
2736 """
2737 Receives and returns a message as a dict
2738
2739 The header keyword is included as key 'header'
2740 """
2741 log = self._log
2742
2743 log(DETAIL, "NODE: ----------------------------")
2744
2745
2746 def read(n):
2747 if n > 1:
2748 log(DEBUG, "read: want %d bytes" % n)
2749 chunks = []
2750 remaining = n
2751 while remaining > 0:
2752 chunk = self.socket.recv(remaining)
2753 chunklen = len(chunk)
2754 if chunk:
2755 chunks.append(chunk)
2756 else:
2757 self.nodeIsAlive = False
2758 raise FCPNodeFailure("FCP socket closed by node")
2759 remaining -= chunklen
2760 if remaining > 0:
2761 if n > 1:
2762 log(DEBUG,
2763 "wanted %s, got %s still need %s bytes" % (n, chunklen, remaining)
2764 )
2765 pass
2766 buf = "".join(chunks)
2767 return buf
2768
2769
2770 def readln():
2771 buf = []
2772 while True:
2773 c = read(1)
2774 buf.append(c)
2775 if c == '\n':
2776 break
2777 ln = "".join(buf)
2778 log(DETAIL, "NODE: " + ln[:-1])
2779 return ln
2780
2781 items = {}
2782
2783
2784 while True:
2785 line = readln().strip()
2786 if line:
2787 items['header'] = line
2788 break
2789
2790
2791 while True:
2792 line = readln().strip()
2793 if line in ['End', 'EndMessage']:
2794 break
2795
2796 if line == 'Data':
2797
2798
2799
2800 id = items['Identifier']
2801 job = self.jobs[id]
2802 if job.stream:
2803
2804 remaining = items['DataLength']
2805 stream = job.stream
2806 while remaining > 0:
2807 buf = self.socket.recv(remaining)
2808 stream.write(buf)
2809 stream.flush()
2810 remaining -= len(buf)
2811 items['Data'] = None
2812 else:
2813 buf = read(items['DataLength'])
2814 items['Data'] = buf
2815 log(DETAIL, "NODE: ...<%d bytes of data>" % len(buf))
2816 break
2817 else:
2818
2819 try:
2820 k, v = line.split("=", 1)
2821 except:
2822 log(ERROR, "_rxMsg: barfed splitting '%s'" % repr(line))
2823 raise
2824
2825
2826 try:
2827 v = int(v)
2828 except:
2829 pass
2830
2831 items[k] = v
2832
2833
2834 return items
2835
2836
2837
2838 - def _log(self, level, msg):
2839 """
2840 Logs a message. If level > verbosity, don't output it
2841 """
2842 if level > self.verbosity:
2843 return
2844
2845 if(None != self.logfile):
2846 if not msg.endswith("\n"):
2847 msg += "\n"
2848 self.logfile.write(msg)
2849 self.logfile.flush()
2850 if(None != self.logfunc):
2851 while( msg.endswith("\n") ):
2852 msg = msg[ : -1 ]
2853 msglines = msg.split("\n")
2854 for msgline in msglines:
2855 self.logfunc(msgline)
2856
2857
2858
2859
2860
2861
2862
2863
2864
2866 """
2867 A JobTicket is an object returned to clients making
2868 asynchronous requests. It puts them in control of how
2869 they manage n concurrent requests.
2870
2871 When you as a client receive a JobTicket, you can choose to:
2872 - block, awaiting completion of the job
2873 - poll the job for completion status
2874 - receive a callback upon completion
2875
2876 Attributes of interest:
2877 - isPersistent - True if job is persistent
2878 - isGlobal - True if job is global
2879 - followRedirect - follow a redirect if true, otherwise fail the get
2880 - value - value returned upon completion, or None if not complete
2881 - node - the node this job belongs to
2882 - id - the job Identifier
2883 - cmd - the FCP message header word
2884 - kw - the keywords in the FCP header
2885 - msgs - any messages received from node in connection
2886 to this job
2887 """
2888
2889
2890 - def __init__(self, node, id, cmd, kw, **opts):
2891 """
2892 You should never instantiate a JobTicket object yourself
2893 """
2894 self.node = node
2895 self.id = id
2896 self.cmd = cmd
2897
2898 self.verbosity = opts.get('verbosity', ERROR)
2899 self._log = opts.get('logger', self.defaultLogger)
2900 self.keep = opts.get('keep', False)
2901 self.stream = opts.get('stream', None)
2902 self.followRedirect = opts.get('followRedirect', False)
2903
2904
2905 if kw.get("Persistent", "connection") != "connection" \
2906 or kw.get("PersistenceType", "connection") != "connection":
2907 self.isPersistent = True
2908 else:
2909 self.isPersistent = False
2910
2911 if kw.get('Global', 'false') == 'true':
2912 self.isGlobal = True
2913 else:
2914 self.isGlobal = False
2915
2916 self.kw = kw
2917
2918 self.msgs = []
2919
2920 callback = kw.pop('callback', None)
2921 if callback:
2922 self.callback = callback
2923
2924 self.timeout = int(kw.pop('timeout', 86400*365))
2925 self.timeQueued = int(time.time())
2926 self.timeSent = None
2927
2928 self.lock = threading.Lock()
2929
2930
2931 self.lock.acquire()
2932 self.result = None
2933
2934 self.reqSentLock = threading.Lock()
2935 self.reqSentLock.acquire()
2936
2937
2938
2940 """
2941 Returns True if the job has been completed
2942 """
2943 return self.result != None
2944
2945
2946
2947 - def wait(self, timeout=None):
2948 """
2949 Waits forever (or for a given timeout) for a job to complete
2950 """
2951 log = self._log
2952
2953 log(DEBUG, "wait:%s:%s: timeout=%ss" % (self.cmd, self.id, timeout))
2954
2955
2956 if timeout == None:
2957 log(DEBUG, "wait:%s:%s: no timeout" % (self.cmd, self.id))
2958 while not self.lock.acquire(False):
2959 time.sleep(_pollInterval)
2960 self.lock.release()
2961 return self.getResult()
2962
2963
2964 then = int(time.time())
2965
2966
2967 while not self.reqSentLock.acquire(False):
2968
2969
2970 elapsed = int(time.time()) - then
2971
2972
2973 if elapsed < timeout:
2974
2975 time.sleep(_pollInterval)
2976 log(DEBUG, "wait:%s:%s: job not dispatched, timeout in %ss" % \
2977 (self.cmd, self.id, timeout-elapsed))
2978 continue
2979
2980
2981 log(DEBUG, "wait:%s:%s: timeout on send command" % (self.cmd, self.id))
2982 raise FCPSendTimeout(
2983 header="Command '%s' took too long to be sent to node" % self.cmd
2984 )
2985
2986 log(DEBUG, "wait:%s:%s: job now dispatched" % (self.cmd, self.id))
2987
2988
2989 while not self.lock.acquire(False):
2990
2991 elapsed = int(time.time()) - then
2992
2993
2994 if elapsed < timeout:
2995
2996 time.sleep(_pollInterval)
2997
2998
2999
3000 if timeout < ONE_YEAR:
3001 log(DEBUG, "wait:%s:%s: awaiting node response, timeout in %ss" % \
3002 (self.cmd, self.id, timeout-elapsed))
3003 continue
3004
3005
3006 log(DEBUG, "wait:%s:%s: timeout on node response" % (self.cmd, self.id))
3007 raise FCPNodeTimeout(
3008 header="Command '%s' took too long for node response" % self.cmd
3009 )
3010
3011 log(DEBUG, "wait:%s:%s: job complete" % (self.cmd, self.id))
3012
3013
3014 self.lock.release()
3015
3016
3017 return self.getResult()
3018
3019
3020
3022 """
3023 Waits till the request has been sent to node
3024 """
3025 self.reqSentLock.acquire()
3026
3027
3028
3030 """
3031 Returns result of job, or None if job still not complete
3032
3033 If result is an exception object, then raises it
3034 """
3035 if isinstance(self.result, Exception):
3036 raise self.result
3037 else:
3038 return self.result
3039
3040
3041
3043 """
3044 This will be replaced in job ticket instances wherever
3045 user provides callback arguments
3046 """
3047
3048
3049
3050
3052 """
3053 Cancels the job, if it is persistent
3054
3055 Does nothing if the job was not persistent
3056 """
3057 if not self.isPersistent:
3058 return
3059
3060
3061 try:
3062 del self.node.jobs[self.id]
3063 except:
3064 pass
3065
3066
3067 if self.isGlobal:
3068 isGlobal = "true"
3069 else:
3070 isGlobal = "False"
3071
3072 self.node._txMsg("RemovePersistentRequest",
3073 Global=isGlobal,
3074 Identifier=self.id)
3075
3076
3077
3079 self.msgs.append(msg)
3080
3081
3082
3084 """
3085 Called by manager thread to indicate job is complete,
3086 and submit a result to be picked up by client
3087 """
3088 self.result = result
3089
3090 if not (self.keep or self.isPersistent or self.isGlobal):
3091 try:
3092 del self.node.jobs[self.id]
3093 except:
3094 pass
3095
3096
3097
3098 try:
3099 self.lock.release()
3100 except:
3101 pass
3102
3103
3104
3105
3106
3108 if self.kw.has_key("URI"):
3109 uri = " URI=%s" % self.kw['URI']
3110 else:
3111 uri = ""
3112 return "<FCP job %s:%s%s" % (self.id, self.cmd, uri)
3113
3114
3115
3117
3118 if level > self.verbosity:
3119 return
3120
3121 if not msg.endswith("\n"): msg += "\n"
3122
3123 sys.stdout.write(msg)
3124 sys.stdout.flush()
3125
3126
3127
3128
3129
3130
3131
3132
3134 try:
3135 arg = int(arg)
3136 if arg:
3137 return "true"
3138 except:
3139 pass
3140
3141 if isinstance(arg, str):
3142 if arg.strip().lower()[0] == 't':
3143 return "true"
3144 else:
3145 return "false"
3146
3147 if arg:
3148 return True
3149 else:
3150 return False
3151
3152
3153
3154 -def readdir(dirpath, prefix='', gethashes=False):
3155 """
3156 Reads a directory, returning a sequence of file dicts.
3157
3158 TODO: Currently this uses sha1 as hash. Freenet uses 256. But the
3159 hashes are not used.
3160
3161 Arguments:
3162 - dirpath - relative or absolute pathname of directory to scan
3163 - gethashes - also include a 'hash' key in each file dict, being
3164 the SHA1 hash of the file's name and contents
3165
3166 Each returned dict in the sequence has the keys:
3167 - fullpath - usable for opening/reading file
3168 - relpath - relative path of file (the part after 'dirpath'),
3169 for the 'SSK@blahblah//relpath' URI
3170 - mimetype - guestimated mimetype for file
3171
3172 >>> tempdir = tempfile.mkdtemp()
3173 >>> filename = "test.txt"
3174 >>> testfile = os.path.join(tempdir, filename)
3175 >>> with open(testfile, "w") as f:
3176 ... f.write("test")
3177 >>> correct = [{'mimetype': 'text/plain', 'fullpath': testfile, 'relpath': filename}]
3178 >>> correct == readdir(tempdir)
3179 True
3180 >>> tempdir = tempfile.mkdtemp()
3181 >>> filename = "test"
3182 >>> testfile = os.path.join(tempdir, filename)
3183 >>> with open(testfile, "w") as f:
3184 ... f.write("test")
3185 >>> correct = [{'mimetype': 'application/octet-stream', 'fullpath': testfile, 'relpath': filename}]
3186 >>> correct == readdir(tempdir)
3187 True
3188 >>> res = readdir(tempdir, gethashes=True)
3189 >>> res[0]["hash"] == hashFile(testfile)
3190 True
3191 """
3192
3193
3194
3195 entries = []
3196 for f in os.listdir(dirpath):
3197 relpath = prefix + f
3198 fullpath = os.path.join(dirpath, f)
3199 if f == '.freesiterc' or f.endswith("~"):
3200 continue
3201 if os.path.isdir(fullpath) \
3202 or os.path.islink(fullpath) and os.path.isdir(os.path.realpath(fullpath)):
3203 entries.extend(
3204 readdir(
3205 os.path.join(dirpath, f),
3206 relpath + os.path.sep,
3207 gethashes
3208 )
3209 )
3210 else:
3211
3212 fullpath = os.path.join(dirpath, f)
3213 entry = {'relpath' :relpath,
3214 'fullpath':fullpath,
3215 'mimetype':guessMimetype(f)
3216 }
3217 if gethashes:
3218 entry['hash'] = hashFile(fullpath)
3219 entries.append(entry)
3220 entries.sort(lambda f1,f2: cmp(f1['relpath'], f2['relpath']))
3221
3222 return entries
3223
3224
3225
3227 """
3228 returns an SHA(1) hash of a file's contents
3229
3230 >>> oslevelid, filepath = tempfile.mkstemp(text=True)
3231 >>> with open(filepath, "w") as f:
3232 ... f.write("test")
3233 >>> hashFile(filepath) == hashlib.sha1("test").hexdigest()
3234 True
3235 """
3236 raw = file(path, "rb").read()
3237 return hashlib.sha1(raw).hexdigest()
3238
3239 -def sha256dda(nodehelloid, identifier, path=None):
3240 """
3241 returns a sha256 hash of a file's contents for bypassing TestDDA
3242
3243 >>> oslevelid, filepath = tempfile.mkstemp(text=True)
3244 >>> with open(filepath, "wb") as f:
3245 ... f.write("test")
3246 >>> print sha256dda("1","2",filepath) == hashlib.sha256("1-2-" + "test").digest()
3247 True
3248 """
3249 tohash = "-".join([nodehelloid, identifier, file(path, "rb").read()])
3250 return hashlib.sha256(tohash).digest()
3251
3252
3253
3255 """
3256 Returns a guess of a mimetype based on a filename's extension
3257 """
3258 if filename.endswith(".tar.bz2"):
3259 return ('application/x-tar', 'bzip2')
3260
3261 try:
3262 m = mimetypes.guess_type(filename, False)[0]
3263 except:
3264 m = None
3265 if m is None:
3266
3267 m = "application/octet-stream"
3268 return m
3269
3270
3271 _re_slugify = re.compile('[^\w\s\.-]', re.UNICODE)
3272 _re_slugify_multidashes = re.compile('[-\s]+', re.UNICODE)
3274 """Make a filename url-safe, keeping only the basename and killing all
3275 potentially unfitting characters.
3276
3277 :returns: urlsafe basename of the file as string."""
3278 filename = unicode(os.path.basename(filename), encoding="utf-8", errors="ignore")
3279 filename = unicodedata.normalize('NFKD', filename).encode("ascii", "ignore")
3280 filename = unicode(_re_slugify.sub('', filename).strip().lower())
3281 filename = _re_slugify_multidashes.sub('-', filename)
3282 return str(filename)
3283
3284
3285
3286
3288 """
3289 analyses an SSK URI, and determines if it is an SSK or USK private key
3290
3291 for details see https://wiki.freenetproject.org/Signed_Subspace_Key
3292
3293 >>> uriIsPrivate("SSK@~Udj39wzRUN4J-Kqn1aWN8kJyHL6d44VSyWoqSjL60A,iAtIH8348UGKfs8lW3mw0lm0D9WLwtsIzZhvMWelpK0,AQACAAE/")
3294 False
3295 >>> uriIsPrivate("SSK@R-skbNbiXqWkqj8FPDTusWyk7u8HLvbdysyRY3eY9A0,iAtIH8348UGKfs8lW3mw0lm0D9WLwtsIzZhvMWelpK0,AQECAAE/")
3296 True
3297 >>> uriIsPrivate("USK@AIcCHvrGspY-7J73J3VR-Td3DuPvw3IqCyjjRK6EvJol,hEvqa41cm72Wc9O1AjZ0OoDU9JVGAvHDDswIE68pT7M,AQECAAE/test.R1/0")
3298 True
3299 >>> uriIsPrivate("KSK@AIcCHvrGspY-7J73J3VR-Td3DuPvw3IqCyjjRK6EvJol,hEvqa41cm72Wc9O1AjZ0OoDU9JVGAvHDDswIE68pT7M,AQECAAE/test.R1/0")
3300 False
3301 >>> uriIsPrivate("SSK@JhtPxdPLx30sRN0c5S2Hhcsif~Yqy1lsGiAx5Wkq7Lo,-e0kLAjmmclSR7uL0TN901tS3iSx2-21Id8tUp4tyzg,AQECAAE/")
3302 True
3303 """
3304
3305 if uri.startswith("freenet:"):
3306 uri = uri[8:]
3307 if uri.startswith("//"):
3308 uri = uri[2:]
3309
3310 if not (uri.startswith("SSK@") or uri.startswith("USK@")):
3311 return False
3312 try:
3313 symmetric, publicprivate, extra = uri.split(",")[:3]
3314 except (IndexError, ValueError):
3315 return False
3316 if "/" in extra:
3317 extra = extra.split("/")[0]
3318 extra += "/"
3319 extrabytes = base64.decodestring(extra)
3320 isprivate = ord(extrabytes[1])
3321 if isprivate:
3322 return True
3323 return False
3324
3325
3326
3328 """
3329 Parses a time value, recognising suffices like 'm' for minutes,
3330 's' for seconds, 'h' for hours, 'd' for days, 'w' for weeks,
3331 'M' for months.
3332
3333 >>> endings = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24, 'w':60*60*24*7, 'M':60*60*24*30}
3334 >>> not False in [endings[i]*3 == parseTime("3"+i) for i in endings]
3335 True
3336
3337 Returns time value in seconds
3338 """
3339 if not t:
3340 raise Exception("Invalid time '%s'" % t)
3341
3342 if not isinstance(t, str):
3343 t = str(t)
3344
3345 t = t.strip()
3346 if not t:
3347 raise Exception("Invalid time value '%s'"% t)
3348
3349 endings = {'s':1, 'm':60, 'h':3600, 'd':86400, 'w':86400*7, 'M':86400*30}
3350
3351 lastchar = t[-1]
3352
3353 if lastchar in endings.keys():
3354 t = t[:-1]
3355 multiplier = endings[lastchar]
3356 else:
3357 multiplier = 1
3358
3359 return int(t) * multiplier
3360
3361
3362
3363
3364
3365
3367 """
3368 Encodes a string to base64, using the Freenet alphabet
3369 """
3370
3371 enc = base64.encodestring(raw)
3372
3373
3374 enc = enc.replace("+", "~")
3375 enc = enc.replace("/", "-")
3376 enc = enc.replace("=", "_")
3377 enc = enc.replace("\n", "")
3378
3379 return enc
3380
3381
3382
3384 """
3385 Decodes a freenet-encoded base64 string back to a binary string
3386
3387 Arguments:
3388 - enc - base64 string to decode
3389 """
3390
3391 enc = enc.replace("_", "=")
3392
3393
3394 while (len(enc) % 4) != 0:
3395 enc += '='
3396
3397
3398 raw = base64.b64decode(enc, '~-')
3399
3400 return raw
3401
3402
3403
3404
3405
3406
3407
3408
3409
3410
3411
3412
3413
3414
3416 """Turn an integer into a simple lowercase base30hex encoding."""
3417 base30 = "0123456789abcdefghijklmnopqrst"
3418 b30 = []
3419 while integer:
3420 b30.append(base30[integer%30])
3421 integer = int(integer / 30)
3422 return "".join(reversed(b30))
3423
3424
3426 import doctest
3427 tests = doctest.testmod()
3428 if tests.failed:
3429 return "☹"*tests.failed
3430 return "^_^ (" + _base30hex(tests.attempted) + ")"
3431
3432
3433 if __name__ == "__main__":
3434 print _test()
3435