Package fcp :: Module node
[hide private]
[frames] | no frames]

Source Code for Module fcp.node

   1  #!/usr/bin/env python 
   2  # encoding: utf-8 
   3  #@+leo-ver=4 
   4  #@+node:@file node.py 
   5  #@@first 
   6  """ 
   7  An implementation of a freenet client library for 
   8  FCP v2, offering considerable flexibility. 
   9   
  10  Clients should instantiate FCPNode, then execute 
  11  its methods to perform tasks with FCP. 
  12   
  13  This module was written by aum, May 2006, released under the GNU Lesser General 
  14  Public License. 
  15   
  16  No warranty, yada yada 
  17   
  18  For FCP documentation, see http://wiki.freenetproject.org/FCPv2 
  19   
  20  """ 
  21   
  22  #@+others 
  23  #@+node:imports 
  24  import Queue 
  25  import base64 
  26  import mimetypes 
  27  import os 
  28  import pprint 
  29  import random 
  30  import select 
  31  import hashlib 
  32  import socket 
  33  import stat 
  34  import sys 
  35  import tempfile 
  36  import thread 
  37  import threading 
  38  import time 
  39  import traceback 
  40  import re 
  41  import unicodedata 
  42   
  43  import pseudopythonparser 
  44   
  45  _pollInterval = 0.03 
  46   
  47  #@-node:imports 
  48  #@+node:exceptions 
49 -class ConnectionRefused(Exception):
50 """ 51 cannot connect to given host/port 52 """
53
54 -class PrivacyRisk(Exception):
55 """ 56 The following code would pose a privacy risk 57 """
58
59 -class FCPException(Exception):
60
61 - def __init__(self, info=None, **kw):
62 #print "Creating fcp exception" 63 if not info: 64 info = kw 65 self.info = info 66 #print "fcp exception created" 67 Exception.__init__(self, str(info))
68
69 - def __str__(self):
70 71 parts = [] 72 for k in ['header', 'ShortCodeDescription', 'CodeDescription']: 73 if self.info.has_key(k): 74 parts.append(str(self.info[k])) 75 return ";".join(parts) or "??"
76
77 -class FCPGetFailed(FCPException):
78 pass
79
80 -class FCPPutFailed(FCPException):
81 pass
82
83 -class FCPProtocolError(FCPException):
84 pass
85
86 -class FCPNodeFailure(Exception):
87 """ 88 node seems to have died 89 """
90
91 -class FCPSendTimeout(FCPException):
92 """ 93 timed out waiting for command to be sent to node 94 """ 95 pass
96
97 -class FCPNodeTimeout(FCPException):
98 """ 99 timed out waiting for node to respond 100 """
101
102 -class FCPNameLookupFailure(Exception):
103 """ 104 name services name lookup failed 105 """
106 107 #@-node:exceptions 108 #@+node:globals 109 # where we can find the freenet node FCP port 110 defaultFCPHost = "127.0.0.1" 111 defaultFCPPort = 9481 112 defaultFProxyHost = "127.0.0.1" 113 defaultFProxyPort = 8888 114 115 # may set environment vars for FCP host/port 116 if os.environ.has_key("FCP_HOST"): 117 defaultFCPHost = os.environ["FCP_HOST"].strip() 118 if os.environ.has_key("FCP_PORT"): 119 defaultFCPPort = int(os.environ["FCP_PORT"].strip()) 120 121 # ditto for fproxy host/port 122 if os.environ.has_key("FPROXY_HOST"): 123 defaultFProxyHost = os.environ["FPROXY_HOST"].strip() 124 if os.environ.has_key("FPROXY_PORT"): 125 defaultFProxyPort = int(os.environ["FPROXY_PORT"].strip()) 126 127 # poll timeout period for manager thread 128 pollTimeout = 0.1 129 #pollTimeout = 3 130 131 # list of keywords sent from node to client, which have 132 # int values 133 intKeys = [ 134 'DataLength', 'Code', 135 ] 136 137 # for the FCP 'ClientHello' handshake 138 expectedVersion="2.0" 139 140 # logger verbosity levels 141 SILENT = 0 142 FATAL = 1 143 CRITICAL = 2 144 ERROR = 3 145 INFO = 4 146 DETAIL = 5 147 DEBUG = 6 148 NOISY = 7 149 150 # peer note types 151 PEER_NOTE_PRIVATE_DARKNET_COMMENT = 1 152 153 defaultVerbosity = ERROR 154 155 ONE_YEAR = 86400 * 365 156 157 #@<<fcp_version>> 158 #@+node:<<fcp_version>> 159 fcpVersion = "0.2.5" 160 161 #@-node:<<fcp_version>> 162 #@nl 163 164 #@-node:globals 165 #@+node:class FCPNode
166 -class FCPNode:
167 """ 168 Represents an interface to a freenet node via its FCP port, 169 and exposes primitives for the basic genkey, get, put and putdir 170 operations as well as peer management primitives. 171 172 Only one instance of FCPNode is needed across an entire 173 running client application, because its methods are quite thread-safe. 174 Creating 2 or more instances is a waste of resources. 175 176 Clients, when invoking methods, have several options regarding flow 177 control and event notification: 178 179 - synchronous call (the default). Here, no pending status messages 180 will ever be seen, and the call will only control when it has 181 completed (successfully, or otherwise) 182 183 - asynchronous call - this is invoked by passing the keyword argument 184 'async=True' to any of the main primitives. When a primitive is invoked 185 asynchronously, it will return a 'job ticket object' immediately. This 186 job ticket has methods for polling for job completion, or blocking 187 awaiting completion 188 189 - setting a callback. You can pass to any of the primitives a 190 'callback=somefunc' keyword arg, where 'somefunc' is a callable object 191 conforming to 'def somefunc(status, value)' 192 193 The callback function will be invoked when a primitive succeeds or fails, 194 as well as when a pending message is received from the node. 195 196 The 'status' argument passed will be one of: 197 - 'successful' - the primitive succeeded, and 'value' will contain 198 the result of the primitive 199 - 'pending' - the primitive is still executing, and 'value' will 200 contain the raw pending message sent back from the node, as a 201 dict 202 - 'failed' - the primitive failed, and as with 'pending', the 203 argument 'value' contains a dict containing the message fields 204 sent back from the node 205 206 Note that callbacks can be set in both synchronous and asynchronous 207 calling modes. 208 209 """ 210 211 svnLongRevision = "$Revision$" 212 svnRevision = svnLongRevision[ 11 : -2 ] 213 214 #@ @+others 215 #@+node:attribs 216 noCloseSocket = True 217 218 nodeIsAlive = False 219 220 nodeVersion = None; 221 nodeFCPVersion = None; 222 nodeBuild = None; 223 nodeRevision = None; 224 nodeExtBuild = None; 225 nodeExtRevision = None; 226 nodeIsTestnet = None; 227 compressionCodecs = [("GZIP", 0), ("BZIP2", 1), ("LZMA", 2)]; # safe defaults 228 229 230 #@-node:attribs 231 #@+node:__init__
232 - def __init__(self, **kw):
233 """ 234 Create a connection object 235 236 Keyword Arguments: 237 - name - name of client to use with reqs, defaults to random. This 238 is crucial if you plan on making persistent requests 239 - host - hostname, defaults to environment variable FCP_HOST, and 240 if this doesn't exist, then defaultFCPHost 241 - port - port number, defaults to environment variable FCP_PORT, and 242 if this doesn't exist, then defaultFCPPort 243 - logfile - a pathname or writable file object, to which log messages 244 should be written, defaults to stdout unless logfunc is specified 245 - logfunc - a function to which log messages should be written or None 246 for no such function should be used, defaults to None 247 - verbosity - how detailed the log messages should be, defaults to 0 248 (silence) 249 - socketTimeout - value to pass to socket object's settimeout() if 250 available and the value is not None, defaults to None 251 252 Attributes of interest: 253 - jobs - a dict of currently running jobs (persistent and nonpersistent). 254 keys are job ids and values are JobTicket objects 255 256 Notes: 257 - when the connection is created, a 'hello' handshake takes place. 258 After that handshake, the node sends back a list of outstanding persistent 259 requests left over from the last connection (based on the value of 260 the 'name' keyword passed into this constructor). 261 262 This object then wraps all this info into JobTicket instances and stores 263 them in the self.persistentJobs dict 264 265 """ 266 # Be sure that we have all of our attributes during __init__ 267 self.running = False 268 self.nodeIsAlive = False 269 self.testedDDA = {} 270 271 # grab and save parms 272 env = os.environ 273 self.name = kw.get('name', self._getUniqueId()) 274 self.host = kw.get('host', env.get("FCP_HOST", defaultFCPHost)) 275 self.port = kw.get('port', env.get("FCP_PORT", defaultFCPPort)) 276 self.port = int(self.port) 277 self.socketTimeout = kw.get('socketTimeout', None) 278 279 #: The id for the connection 280 self.connectionidentifier = None 281 282 # set up the logger 283 logfile = kw.get('logfile', None) 284 logfunc = kw.get('logfunc', None) 285 if(None == logfile and None == logfunc): 286 logfile = sys.stdout 287 if(None != logfile and not hasattr(logfile, 'write')): 288 # might be a pathname 289 if not isinstance(logfile, str): 290 raise Exception("Bad logfile '%s', must be pathname or file object" % logfile) 291 logfile = file(logfile, "a") 292 self.logfile = logfile 293 self.logfunc = logfunc 294 self.verbosity = kw.get('verbosity', defaultVerbosity) 295 296 # try to connect to node 297 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 298 if(None != self.socketTimeout): 299 try: 300 self.socket.settimeout(self.socketTimeout) 301 except Exception, e: 302 # Socket timeout setting is not available until Python 2.3, so ignore exceptions 303 pass 304 try: 305 self.socket.connect((self.host, self.port)) 306 except Exception, e: 307 raise Exception("Failed to connect to %s:%s - %s" % (self.host, 308 self.port, 309 e)) 310 311 # now do the hello 312 self._hello() 313 self.nodeIsAlive = True 314 315 # the pending job tickets 316 self.jobs = {} # keyed by request ID 317 self.keepJobs = [] # job ids that should never be removed from self.jobs 318 319 # queue for incoming client requests 320 self.clientReqQueue = Queue.Queue() 321 322 # launch receiver thread 323 self.running = True 324 self.shutdownLock = threading.Lock() 325 thread.start_new_thread(self._mgrThread, ()) 326 327 # and set up the name service 328 namesitefile = kw.get('namesitefile', None) 329 self.namesiteInit(namesitefile)
330 331 #@-node:__init__ 332 #@+node:__del__
333 - def __del__(self):
334 """ 335 object is getting cleaned up, so disconnect 336 """ 337 # terminate the node 338 try: 339 self.shutdown() 340 except: 341 traceback.print_exc() 342 pass
343 344 #@-node:__del__ 345 #@+node:FCP Primitives 346 # basic FCP primitives 347 348 #@+others 349 #@+node:genkey
350 - def genkey(self, **kw):
351 """ 352 Generates and returns an SSK keypair 353 354 Keywords: 355 - async - whether to do this call asynchronously, and 356 return a JobTicket object 357 - callback - if given, this should be a callable which accepts 2 358 arguments: 359 - status - will be one of 'successful', 'failed' or 'pending' 360 - value - depends on status: 361 - if status is 'successful', this will contain the value 362 returned from the command 363 - if status is 'failed' or 'pending', this will contain 364 a dict containing the response from node 365 - usk - default False - if True, returns USK uris 366 - name - the path to put at end, optional 367 """ 368 id = kw.pop("id", None) 369 if not id: 370 id = self._getUniqueId() 371 372 pub, priv = self._submitCmd(id, "GenerateSSK", Identifier=id, **kw) 373 374 name = kw.get("name", None) 375 if name: 376 pub = pub + name 377 priv = priv + name 378 379 if kw.get("usk", False): 380 pub = pub.replace("SSK@", "USK@")+"/0" 381 priv = priv.replace("SSK@", "USK@")+"/0" 382 383 return pub, priv
384 385 #@-node:genkey 386
387 - def fcpPluginMessage(self, **kw):
388 """ 389 Sends an FCPPluginMessage and returns FCPPluginReply message contents 390 391 Keywords: 392 - async - whether to do this call asynchronously, and 393 return a JobTicket object 394 - callback - if given, this should be a callable which accepts 2 395 arguments: 396 - status - will be one of 'successful', 'failed' or 'pending' 397 - value - depends on status: 398 - if status is 'successful', this will contain the value 399 returned from the command 400 - if status is 'failed' or 'pending', this will contain 401 a dict containing the response from node 402 - plugin_name - A name to identify the plugin. The same as class name 403 shown on plugins page. 404 - plugin_params - a dict() containing the key-value pairs to be sent 405 to the plugin as parameters 406 """ 407 408 id = kw.pop("id", None) 409 if not id: 410 id = self._getUniqueId() 411 412 params = dict(PluginName = kw.get('plugin_name'), 413 Identifier = id, 414 async = kw.get('async',False), 415 callback = kw.get('callback',None)) 416 417 for key, val in kw.get('plugin_params',{}).iteritems(): 418 params.update({'Param.%s' % str(key) : val}) 419 420 return self._submitCmd(id, "FCPPluginMessage", **params)
421 422 #@+node:get
423 - def get(self, uri, **kw):
424 """ 425 Does a direct get of a key 426 427 Keywords: 428 - async - whether to return immediately with a job ticket object, default 429 False (wait for completion) 430 - persistence - default 'connection' - the kind of persistence for 431 this request. If 'reboot' or 'forever', this job will be able to 432 be recalled in subsequent FCP sessions. Other valid values are 433 'reboot' and 'forever', as per FCP spec 434 - Global - default false - if evaluates to true, puts this request 435 on the global queue. Note the capital G in Global. If you set this, 436 persistence must be 'reboot' or 'forever' 437 - Verbosity - default 0 - sets the Verbosity mask passed in the 438 FCP message - case-sensitive 439 - priority - the PriorityClass for retrieval, default 2, may be between 440 0 (highest) to 6 (lowest) 441 442 - dsnly - whether to only check local datastore 443 - ignoreds - don't check local datastore 444 445 - file - if given, this is a pathname to which to store the retrieved key 446 - followRedirect - follow a redirect if true, otherwise fail the get 447 - nodata - if true, no data will be returned. This can be a useful 448 test of whether a key is retrievable, without having to consume 449 resources by retrieving it 450 - stream - if given, this is a writeable file object, to which the 451 received data should be written a chunk at a time 452 - timeout - timeout for completion, in seconds, default one year 453 454 Returns a 3-tuple, depending on keyword args: 455 - if 'file' is given, returns (mimetype, pathname) if key is returned 456 - if 'file' is not given, returns (mimetype, data, msg) if key is returned 457 - if 'nodata' is true, returns (mimetype, 1) if key is returned 458 - if 'stream' is given, returns (mimetype, None) if key is returned, 459 because all the data will have been written to the stream 460 If key is not found, raises an exception 461 """ 462 self._log(INFO, "get: uri=%s" % uri) 463 464 self._log(DETAIL, "get: kw=%s" % kw) 465 466 # --------------------------------- 467 # format the request 468 opts = {} 469 470 id = kw.pop("id", None) 471 if not id: 472 id = self._getUniqueId() 473 474 opts['async'] = kw.pop('async', False) 475 opts['followRedirect'] = kw.pop('followRedirect', False) 476 opts['waituntilsent'] = kw.get('waituntilsent', False) 477 if kw.has_key('callback'): 478 opts['callback'] = kw['callback'] 479 opts['Persistence'] = kw.pop('persistence', 'connection') 480 if kw.get('Global', False): 481 print "global get" 482 opts['Global'] = "true" 483 else: 484 opts['Global'] = "false" 485 486 opts['Verbosity'] = kw.get('Verbosity', 0) 487 488 if opts['Global'] == 'true' and opts['Persistence'] == 'connection': 489 raise Exception("Global requests must be persistent") 490 491 file = kw.pop("file", None) 492 if file: 493 # make sure we have an absolute path 494 file = os.path.abspath(file) 495 opts['ReturnType'] = "disk" 496 #opts['File'] = file 497 opts['Filename'] = file 498 # need to do a TestDDARequest to have a chance of a 499 # successful get to file. 500 self.testDDA(Directory=os.path.dirname(file), 501 WantWriteDirectory=True) 502 503 elif kw.get('nodata', False): 504 nodata = True 505 opts['ReturnType'] = "none" 506 elif kw.has_key('stream'): 507 opts['ReturnType'] = "direct" 508 opts['stream'] = kw['stream'] 509 else: 510 nodata = False 511 opts['ReturnType'] = "direct" 512 513 opts['Identifier'] = id 514 515 if kw.get("ignoreds", False): 516 opts["IgnoreDS"] = "true" 517 else: 518 opts["IgnoreDS"] = "false" 519 520 if kw.get("dsonly", False): 521 opts["DSOnly"] = "true" 522 else: 523 opts["DSOnly"] = "false" 524 525 # if uri.startswith("freenet:CHK@") or uri.startswith("CHK@"): 526 # uri = os.path.splitext(uri)[0] 527 528 # process uri, including possible namesite lookups 529 uri = uri.split("freenet:")[-1] 530 if len(uri) < 5 or (uri[:4] not in ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')): 531 # we seem to have a 'domain name' uri 532 try: 533 domain, rest = uri.split("/", 1) 534 except: 535 domain = uri 536 rest = '' 537 538 tgtUri = self.namesiteLookup(domain) 539 if not tgtUri: 540 raise FCPNameLookupFailure( 541 "Failed to resolve freenet domain '%s'" % domain) 542 if rest: 543 uri = (tgtUri + "/" + rest).replace("//", "/") 544 else: 545 uri = tgtUri 546 547 opts['URI'] = uri 548 549 opts['MaxRetries'] = kw.get("maxretries", -1) 550 opts['MaxSize'] = kw.get("maxsize", "1000000000000") 551 opts['PriorityClass'] = int(kw.get("priority", 2)) 552 553 opts['timeout'] = int(kw.pop("timeout", ONE_YEAR)) 554 555 #print "get: opts=%s" % opts 556 557 # --------------------------------- 558 # now enqueue the request 559 return self._submitCmd(id, "ClientGet", **opts)
560 561 #@-node:get 562 #@+node:put
563 - def put(self, uri="CHK@", **kw):
564 """ 565 Inserts a key 566 567 Arguments: 568 - uri - uri under which to insert the key 569 570 Keywords - you must specify one of the following to choose an insert mode: 571 - file - path of file from which to read the key data 572 - data - the raw data of the key as string 573 - dir - the directory to insert, for freesite insertion 574 - redirect - the target URI to redirect to 575 576 Keywords for 'file' mode: 577 - name - human-readable target filename - default is taken from URI 578 579 Keywords for 'dir' mode: 580 - name - name of the freesite, the 'sitename' in SSK@privkey/sitename' 581 - usk - whether to insert as a USK (USK@privkey/sitename/version/), default False 582 - version - valid if usk is true, default 0 583 584 Keywords for 'file' and 'data' modes: 585 - chkonly - only generate CHK, don't insert - default false 586 - dontcompress - do not compress on insert - default false 587 588 Keywords for 'file', 'data' and 'redirect' modes: 589 - mimetype - the mime type, default text/plain 590 591 Keywords valid for all modes: 592 - async - whether to do the job asynchronously, returning a job ticket 593 object (default False) 594 - waituntilsent - default False, if True, and if async=True, waits 595 until the command has been sent to the node before returning a 596 job object 597 - persistence - default 'connection' - the kind of persistence for 598 this request. If 'reboot' or 'forever', this job will be able to 599 be recalled in subsequent FCP sessions. Other valid values are 600 'reboot' and 'forever', as per FCP spec 601 - Global - default false - if evaluates to true, puts this request 602 on the global queue. Note the capital G in Global. If you set this, 603 persistence must be 'reboot' or 'forever' 604 - Verbosity - default 0 - sets the Verbosity mask passed in the 605 FCP message - case-sensitive 606 607 - maxretries - maximum number of retries, default 3 608 - priority - the PriorityClass for retrieval, default 3, may be between 609 0 (highest) to 6 (lowest) 610 - realtime true/false - sets the RealTimeRequest flag. 611 612 - timeout - timeout for completion, in seconds, default one year 613 614 Notes: 615 - exactly one of 'file', 'data' or 'dir' keyword arguments must be present 616 """ 617 # divert to putdir if dir keyword present 618 if kw.has_key('dir'): 619 self._log(DETAIL, "put => putdir") 620 return self.putdir(uri, **kw) 621 622 # --------------------------------- 623 # format the request 624 opts = {} 625 626 opts['async'] = kw.get('async', False) 627 opts['waituntilsent'] = kw.get('waituntilsent', False) 628 opts['keep'] = kw.get('keep', False) 629 if kw.has_key('callback'): 630 opts['callback'] = kw['callback'] 631 632 self._log(DETAIL, "put: uri=%s async=%s waituntilsent=%s" % ( 633 uri, opts['async'], opts['waituntilsent'])) 634 635 opts['Persistence'] = kw.pop('persistence', 'connection') 636 if kw.get('Global', False): 637 opts['Global'] = "true" 638 else: 639 opts['Global'] = "false" 640 641 if opts['Global'] == 'true' and opts['Persistence'] == 'connection': 642 raise Exception("Global requests must be persistent") 643 644 # process uri, including possible namesite lookups 645 uri = uri.split("freenet:")[-1] 646 if len(uri) < 4 or (uri[:4] not in ('SSK@', 'KSK@', 'CHK@', 'USK@', 'SVK@')): 647 # we seem to have a 'domain name' uri 648 try: 649 domain, rest = uri.split("/", 1) 650 except: 651 domain = uri 652 rest = '' 653 654 tgtUri = self.namesiteLookup(domain) 655 if not tgtUri: 656 raise FCPNameLookupFailure( 657 "Failed to resolve freenet domain '%s'" % domain) 658 if rest: 659 uri = (tgtUri + "/" + rest).replace("//", "/") 660 else: 661 uri = tgtUri 662 663 opts['URI'] = uri 664 665 # determine a mimetype 666 mimetype = kw.get("mimetype", None) 667 if mimetype is None: 668 # not explicitly given - figure one out (based on filename) 669 ext = os.path.splitext(uri)[1] 670 if ext: 671 # only use basename, if it has an extension 672 filename = os.path.basename(uri) 673 else: 674 # no CHK@ file extension, try for filename (only in "file" mode) 675 if kw.get('file', None) is not None: 676 filename = os.path.basename(kw['file']) 677 else: 678 # last resort fallback: use the full uri. 679 filename = uri 680 681 # got some kind of 'filename with extension', convert to mimetype 682 mimetype = guessMimetype(filename) 683 684 # now can specify the mimetype 685 opts['Metadata.ContentType'] = mimetype 686 687 id = kw.pop("id", None) 688 if not id: 689 id = self._getUniqueId() 690 opts['Identifier'] = id 691 692 chkOnly = toBool(kw.get("chkonly", "false")) 693 694 opts['Verbosity'] = kw.get('Verbosity', 0) 695 opts['MaxRetries'] = kw.get("maxretries", -1) 696 opts['PriorityClass'] = kw.get("priority", 3) 697 opts['RealTimeFlag'] = toBool(kw.get("realtime", "false")) 698 opts['GetCHKOnly'] = chkOnly 699 opts['DontCompress'] = toBool(kw.get("nocompress", "false")) 700 opts['Codecs'] = kw.get('Codecs', 701 self.defaultCompressionCodecsString()) 702 703 if kw.has_key("file"): 704 filepath = os.path.abspath(kw['file']) 705 opts['UploadFrom'] = "disk" 706 opts['Filename'] = filepath 707 if not kw.has_key("mimetype"): 708 opts['Metadata.ContentType'] = guessMimetype(kw['file']) 709 # Add a base64 encoded sha256 hash of the file to sidestep DDA 710 opts['FileHash'] = base64.encodestring( 711 sha256dda(self.connectionidentifier, id, 712 path=filepath)) 713 714 elif kw.has_key("data"): 715 opts["UploadFrom"] = "direct" 716 opts["Data"] = kw['data'] 717 targetFilename = kw.get('name') 718 if targetFilename: 719 opts["TargetFilename"] = targetFilename 720 721 elif kw.has_key("redirect"): 722 opts["UploadFrom"] = "redirect" 723 opts["TargetURI"] = kw['redirect'] 724 elif chkOnly != "true": 725 raise Exception("Must specify file, data or redirect keywords") 726 727 if "TargetFilename" in kw: # for CHKs 728 opts["TargetFilename"] = kw["TargetFilename"] 729 730 731 opts['timeout'] = int(kw.get("timeout", ONE_YEAR)) 732 733 #print "sendEnd=%s" % sendEnd 734 735 # --------------------------------- 736 # now dispatch the job 737 return self._submitCmd(id, "ClientPut", **opts)
738 739 #@-node:put 740 #@+node:putdir
741 - def putdir(self, uri, **kw):
742 """ 743 Inserts a freesite 744 745 Arguments: 746 - uri - uri under which to insert the key 747 748 Keywords: 749 - dir - the directory to insert - mandatory, no default. 750 This directory must contain a toplevel index.html file 751 - name - the name of the freesite, defaults to 'freesite' 752 - usk - set to True to insert as USK (Default false) 753 - version - the USK version number, default 0 754 755 - filebyfile - default False - if True, manually inserts 756 each constituent file, then performs the ClientPutComplexDir 757 as a manifest full of redirects. You *must* use this mode 758 if inserting from across a LAN 759 760 - maxretries - maximum number of retries, default 3 761 - priority - the PriorityClass for retrieval, default 2, may be between 762 0 (highest) to 6 (lowest) 763 764 - id - the job identifier, for persistent requests 765 - async - default False - if True, return immediately with a job ticket 766 - persistence - default 'connection' - the kind of persistence for 767 this request. If 'reboot' or 'forever', this job will be able to 768 be recalled in subsequent FCP sessions. Other valid values are 769 'reboot' and 'forever', as per FCP spec 770 - Global - default false - if evaluates to true, puts this request 771 on the global queue. Note the capital G in Global. If you set this, 772 persistence must be 'reboot' or 'forever' 773 - Verbosity - default 0 - sets the Verbosity mask passed in the 774 FCP message - case-sensitive 775 - allatonce - default False - if set, and if filebyfile is set, then 776 all files of the site will be inserted simultaneously, which can give 777 a nice speed-up for small to moderate sites, but cruel choking on 778 large sites; use with care 779 - globalqueue - perform the inserts on the global queue, which will 780 survive node reboots 781 782 - timeout - timeout for completion, in seconds, default one year 783 784 785 Returns: 786 - the URI under which the freesite can be retrieved 787 """ 788 log = self._log 789 log(INFO, "putdir: uri=%s dir=%s" % (uri, kw['dir'])) 790 791 #@ <<process keyword args>> 792 #@+node:<<process keyword args>> 793 # -------------------------------------------------------------- 794 # process keyword args 795 796 chkonly = False 797 #chkonly = True 798 799 # get keyword args 800 dir = kw['dir'] 801 sitename = kw.get('name', 'freesite') 802 usk = kw.get('usk', False) 803 version = kw.get('version', 0) 804 maxretries = kw.get('maxretries', 3) 805 priority = kw.get('priority', 4) 806 Verbosity = kw.get('Verbosity', 0) 807 808 filebyfile = kw.get('filebyfile', False) 809 810 #if filebyfile: 811 # raise Hell 812 813 if kw.has_key('allatonce'): 814 allAtOnce = kw['allatonce'] 815 filebyfile = True 816 else: 817 allAtOnce = False 818 819 if kw.has_key('maxconcurrent'): 820 maxConcurrent = kw['maxconcurrent'] 821 filebyfile = True 822 allAtOnce = True 823 else: 824 maxConcurrent = 10 825 826 if kw.get('globalqueue', False): 827 globalMode = True 828 globalWord = "true" 829 persistence = "forever" 830 else: 831 globalMode = False 832 globalWord = "false" 833 persistence = "connection" 834 835 id = kw.pop("id", None) 836 if not id: 837 id = self._getUniqueId() 838 839 codecs = kw.get('Codecs', 840 self.defaultCompressionCodecsString()) 841 842 # derive final URI for insert 843 uriFull = uri + sitename + "/" 844 if kw.get('usk', False): 845 uriFull += "%d/" % int(version) 846 uriFull = uriFull.replace("SSK@", "USK@") 847 while uriFull.endswith("/"): 848 uriFull = uriFull[:-1] 849 850 manifestDict = kw.get('manifest', None) 851 852 #@-node:<<process keyword args>> 853 #@nl 854 855 #@ <<get inventory>> 856 #@+node:<<get inventory>> 857 # -------------------------------------------------------------- 858 # procure a manifest dict, whether supplied by caller or derived 859 if manifestDict: 860 # work from the manifest provided by caller 861 #print "got manifest kwd" 862 #print manifestDict 863 manifest = [] 864 for relpath, attrDict in manifestDict.items(): 865 if attrDict['changed'] or (relpath == "index.html"): 866 attrDict['relpath'] = relpath 867 attrDict['fullpath'] = os.path.join(dir, relpath) 868 manifest.append(attrDict) 869 else: 870 # build manifest by reading the directory 871 #print "no manifest kwd" 872 manifest = readdir(kw['dir']) 873 manifestDict = {} 874 for rec in manifest: 875 manifestDict[rec['relpath']] = rec 876 #print manifestDict 877 878 #@-node:<<get inventory>> 879 #@nl 880 881 #@ <<global mode>> 882 #@+node:<<global mode>> 883 if 0: 884 #@ <<derive chks>> 885 #@+node:<<derive chks>> 886 # -------------------------------------------------------------- 887 # derive CHKs for all items 888 889 log(INFO, "putdir: determining chks for all files") 890 891 for filerec in manifest: 892 893 # get the record and its fields 894 relpath = filerec['relpath'] 895 fullpath = filerec['fullpath'] 896 mimetype = filerec['mimetype'] 897 898 # get raw file contents 899 raw = file(fullpath, "rb").read() 900 901 # determine CHK 902 uri = self.put("CHK@", 903 data=raw, 904 mimetype=mimetype, 905 Verbosity=Verbosity, 906 chkonly=True, 907 priority=priority, 908 ) 909 910 if uri != filerec.get('uri', None): 911 filerec['changed'] = True 912 filerec['uri'] = uri 913 914 log(INFO, "%s -> %s" % (relpath, uri)) 915 916 #@-node:<<derive chks>> 917 #@nl 918 919 #@ <<build chk-based manifest>> 920 #@+node:<<build chk-based manifest>> 921 if filebyfile: 922 923 # -------------------------------------------------------------- 924 # now can build up a command buffer to insert the manifest 925 # since we know all the file chks 926 msgLines = ["ClientPutComplexDir", 927 "Identifier=%s" % id, 928 "Verbosity=%s" % Verbosity, 929 "MaxRetries=%s" % maxretries, 930 "PriorityClass=%s" % priority, 931 "URI=%s" % uriFull, 932 "Codecs=%s" % codecs, 933 #"Persistence=%s" % kw.get("persistence", "connection"), 934 "DefaultName=index.html", 935 ] 936 # support global queue option 937 if globalMode: 938 msgLines.extend([ 939 "Persistence=forever", 940 "Global=true", 941 ]) 942 else: 943 msgLines.extend([ 944 "Persistence=connection", 945 "Global=false", 946 ]) 947 948 # add each file's entry to the command buffer 949 n = 0 950 default = None 951 for filerec in manifest: 952 relpath = filerec['relpath'] 953 mimetype = filerec['mimetype'] 954 955 log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath))) 956 957 msgLines.extend(["Files.%d.Name=%s" % (n, relpath), 958 "Files.%d.UploadFrom=redirect" % n, 959 "Files.%d.TargetURI=%s" % (n, filerec['uri']), 960 ]) 961 n += 1 962 963 # finish the command buffer 964 msgLines.append("EndMessage") 965 manifestInsertCmdBuf = "\n".join(msgLines) + "\n" 966 967 # gotta log the command buffer here, since it's not sent via .put() 968 for line in msgLines: 969 log(DETAIL, line) 970 971 #raise Exception("debugging") 972 973 #@-node:<<build chk-based manifest>> 974 #@nl 975 976 #@-node:<<global mode>> 977 #@nl 978 979 #@ <<single-file inserts>> 980 #@+node:<<single-file inserts>> 981 # -------------------------------------------------------------- 982 # for file-by-file mode, queue up the inserts and await completion 983 jobs = [] 984 #allAtOnce = False 985 986 if filebyfile: 987 988 log(INFO, "putdir: starting file-by-file inserts") 989 990 lastProgressMsgTime = time.time() 991 992 # insert each file, one at a time 993 nTotal = len(manifest) 994 995 # output status messages, and manage concurrent inserts 996 while True: 997 # get progress counts 998 nQueued = len(jobs) 999 nComplete = len( 1000 filter( 1001 lambda j: j.isComplete(), 1002 jobs 1003 ) 1004 ) 1005 nWaiting = nTotal - nQueued 1006 nInserting = nQueued - nComplete 1007 1008 # spit a progress message every 10 seconds 1009 now = time.time() 1010 if now - lastProgressMsgTime >= 10: 1011 lastProgressMsgTime = time.time() 1012 log(INFO, 1013 "putdir: waiting=%s inserting=%s done=%s total=%s" % ( 1014 nWaiting, nInserting, nComplete, nTotal) 1015 ) 1016 1017 # can bail if all done 1018 if nComplete == nTotal: 1019 log(INFO, "putdir: all inserts completed (or failed)") 1020 break 1021 1022 # wait and go round again if concurrent inserts are maxed 1023 if nInserting >= maxConcurrent: 1024 time.sleep(_pollInterval) 1025 continue 1026 1027 # just go round again if manifest is empty (all remaining are in progress) 1028 if len(manifest) == 0: 1029 time.sleep(_pollInterval) 1030 continue 1031 1032 # got >0 waiting jobs and >0 spare slots, so we can submit a new one 1033 filerec = manifest.pop(0) 1034 relpath = filerec['relpath'] 1035 fullpath = filerec['fullpath'] 1036 mimetype = filerec['mimetype'] 1037 1038 #manifestDict[relpath] = filerec 1039 1040 log(INFO, "Launching insert of %s" % relpath) 1041 1042 1043 # gotta suck raw data, since we might be inserting to a remote FCP 1044 # service (which means we can't use 'file=' (UploadFrom=pathmae) keyword) 1045 raw = file(fullpath, "rb").read() 1046 1047 print "globalMode=%s persistence=%s" % (globalMode, persistence) 1048 1049 # fire up the insert job asynchronously 1050 job = self.put("CHK@", 1051 data=raw, 1052 mimetype=mimetype, 1053 async=1, 1054 waituntilsent=1, 1055 Verbosity=Verbosity, 1056 chkonly=chkonly, 1057 priority=priority, 1058 Global=globalMode, 1059 Persistence=persistence, 1060 ) 1061 jobs.append(job) 1062 filerec['job'] = job 1063 job.filerec = filerec 1064 1065 # wait for that job to finish if we are in the slow 'one at a time' mode 1066 if not allAtOnce: 1067 job.wait() 1068 log(INFO, "Insert finished for %s" % relpath) 1069 1070 # all done 1071 log(INFO, "All raw files now inserted (or failed)") 1072 1073 1074 #@-node:<<single-file inserts>> 1075 #@nl 1076 1077 #@ <<build manifest insertion cmd>> 1078 #@+node:<<build manifest insertion cmd>> 1079 # -------------------------------------------------------------- 1080 # now can build up a command buffer to insert the manifest 1081 msgLines = ["ClientPutComplexDir", 1082 "Identifier=%s" % id, 1083 "Verbosity=%s" % Verbosity, 1084 "MaxRetries=%s" % maxretries, 1085 "PriorityClass=%s" % priority, 1086 "URI=%s" % uriFull, 1087 "Codecs=%s" % codecs, 1088 #"Persistence=%s" % kw.get("persistence", "connection"), 1089 "DefaultName=index.html", 1090 ] 1091 # support global queue option 1092 if kw.get('Global', False): 1093 msgLines.extend([ 1094 "Persistence=forever", 1095 "Global=true", 1096 ]) 1097 else: 1098 msgLines.extend([ 1099 "Persistence=connection", 1100 "Global=false", 1101 ]) 1102 1103 # add each file's entry to the command buffer 1104 n = 0 1105 default = None 1106 for job in jobs: 1107 filerec = job.filerec 1108 relpath = filerec['relpath'] 1109 fullpath = filerec['fullpath'] 1110 mimetype = filerec['mimetype'] 1111 1112 # don't add if the file failed to insert 1113 if filebyfile: 1114 if isinstance(filerec['job'].result, Exception): 1115 log(ERROR, "File %s failed to insert" % relpath) 1116 continue 1117 1118 log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath))) 1119 1120 msgLines.extend(["Files.%d.Name=%s" % (n, relpath), 1121 ]) 1122 if filebyfile: 1123 #uri = filerec['uri'] or filerec['job'].result 1124 uri = job.result 1125 if not uri: 1126 raise Exception("Can't find a URI for file %s" % filerec['relpath']) 1127 1128 msgLines.extend(["Files.%d.UploadFrom=redirect" % n, 1129 "Files.%d.TargetURI=%s" % (n, uri), 1130 ]) 1131 else: 1132 msgLines.extend(["Files.%d.UploadFrom=disk" % n, 1133 "Files.%d.Filename=%s" % (n, fullpath), 1134 ]) 1135 n += 1 1136 1137 # finish the command buffer 1138 msgLines.append("EndMessage") 1139 manifestInsertCmdBuf = "\n".join(msgLines) + "\n" 1140 1141 # gotta log the command buffer here, since it's not sent via .put() 1142 for line in msgLines: 1143 log(DETAIL, line) 1144 1145 #@-node:<<build manifest insertion cmd>> 1146 #@nl 1147 1148 #@ <<insert manifest>> 1149 #@+node:<<insert manifest>> 1150 # -------------------------------------------------------------- 1151 # now dispatch the manifest insertion job 1152 if chkonly: 1153 finalResult = "no_uri" 1154 else: 1155 finalResult = self._submitCmd( 1156 id, "ClientPutComplexDir", 1157 rawcmd=manifestInsertCmdBuf, 1158 async=kw.get('async', False), 1159 waituntilsent=kw.get('waituntilsent', False), 1160 callback=kw.get('callback', False), 1161 #Persistence=kw.get('Persistence', 'connection'), 1162 ) 1163 1164 #@-node:<<insert manifest>> 1165 #@nl 1166 1167 # finally all done, return result or job ticket 1168 return finalResult
1169
1170 - def modifyconfig(self, **kw):
1171 """ 1172 Modifies node configuration 1173 1174 Keywords: 1175 - async - whether to do this call asynchronously, and 1176 return a JobTicket object 1177 - callback - if given, this should be a callable which accepts 2 1178 arguments: 1179 - status - will be one of 'successful', 'failed' or 'pending' 1180 - value - depends on status: 1181 - if status is 'successful', this will contain the value 1182 returned from the command 1183 - if status is 'failed' or 'pending', this will contain 1184 a dict containing the response from node 1185 - keywords, which are the same as for the FCP message and documented in the wiki: http://wiki.freenetproject.org/FCP2p0ModifyConfig 1186 """ 1187 return self._submitCmd("__global", "ModifyConfig", **kw)
1188 1189 #@-node:putdir 1190 #@+node:getconfig
1191 - def getconfig(self, **kw):
1192 """ 1193 Gets node configuration 1194 1195 Keywords: 1196 - async - whether to do this call asynchronously, and 1197 return a JobTicket object 1198 - callback - if given, this should be a callable which accepts 2 1199 arguments: 1200 - status - will be one of 'successful', 'failed' or 'pending' 1201 - value - depends on status: 1202 - if status is 'successful', this will contain the value 1203 returned from the command 1204 - if status is 'failed' or 'pending', this will contain 1205 a dict containing the response from node 1206 - WithCurrent - default False - if True, the current configuration settings will be returned in the "current" tree of the ConfigData message fieldset 1207 - WithShortDescription - default False - if True, the configuration setting short descriptions will be returned in the "shortDescription" tree of the ConfigData message fieldset 1208 - other keywords, which are the same as for the FCP message and documented in the wiki: http://wiki.freenetproject.org/FCP2p0GetConfig 1209 """ 1210 1211 return self._submitCmd("__global", "GetConfig", **kw)
1212 1213 #@-node:getconfig 1214 #@+node:invertprivate
1215 - def invertprivate(self, privatekey):
1216 """ 1217 Converts an SSK or USK private key to a public equivalent 1218 """ 1219 privatekey = privatekey.strip().split("freenet:")[-1] 1220 1221 isUsk = privatekey.startswith("USK@") 1222 1223 if isUsk: 1224 privatekey = privatekey.replace("USK@", "SSK@") 1225 1226 bits = privatekey.split("/", 1) 1227 mainUri = bits[0] 1228 1229 uri = self.put(mainUri+"/foo", data="bar", chkonly=1) 1230 1231 uri = uri.split("/")[0] 1232 uri = "/".join([uri] + bits[1:]) 1233 1234 if isUsk: 1235 uri = uri.replace("SSK@", "USK@") 1236 1237 return uri
1238 1239 #@-node:invertprivate 1240 #@+node:redirect
1241 - def redirect(self, srcKey, destKey, **kw):
1242 """ 1243 Inserts key srcKey, as a redirect to destKey. 1244 srcKey must be a KSK, or a path-less SSK or USK (and not a CHK) 1245 """ 1246 uri = self.put(srcKey, redirect=destKey, **kw) 1247 1248 return uri
1249 1250 #@-node:redirect 1251 #@+node:genchk
1252 - def genchk(self, **kw):
1253 """ 1254 Returns the CHK URI under which a data item would be 1255 inserted. 1256 1257 Keywords - you must specify one of the following: 1258 - file - path of file from which to read the key data 1259 - data - the raw data of the key as string 1260 1261 Keywords - optional: 1262 - mimetype - defaults to text/plain - THIS AFFECTS THE CHK!! 1263 """ 1264 return self.put(chkonly=True, **kw)
1265 1266 #@-node:genchk 1267 #@+node:listpeers
1268 - def listpeers(self, **kw):
1269 """ 1270 Gets the list of peers from the node 1271 1272 Keywords: 1273 - async - whether to do this call asynchronously, and 1274 return a JobTicket object 1275 - callback - if given, this should be a callable which accepts 2 1276 arguments: 1277 - status - will be one of 'successful', 'failed' or 'pending' 1278 - value - depends on status: 1279 - if status is 'successful', this will contain the value 1280 returned from the command 1281 - if status is 'failed' or 'pending', this will contain 1282 a dict containing the response from node 1283 - WithMetadata - default False - if True, returns a peer's metadata 1284 - WithVolatile - default False - if True, returns a peer's volatile info 1285 """ 1286 1287 return self._submitCmd("__global", "ListPeers", **kw)
1288 1289 #@-node:listpeers 1290 #@+node:listpeernotes
1291 - def listpeernotes(self, **kw):
1292 """ 1293 Gets the list of peer notes for a given peer from the node 1294 1295 Keywords: 1296 - async - whether to do this call asynchronously, and 1297 return a JobTicket object 1298 - callback - if given, this should be a callable which accepts 2 1299 arguments: 1300 - status - will be one of 'successful', 'failed' or 'pending' 1301 - value - depends on status: 1302 - if status is 'successful', this will contain the value 1303 returned from the command 1304 - if status is 'failed' or 'pending', this will contain 1305 a dict containing the response from node 1306 - NodeIdentifier - one of name, identity or IP:port for the desired peer 1307 """ 1308 1309 return self._submitCmd("__global", "ListPeerNotes", **kw)
1310 1311 #@-node:listpeernotes 1312 #@+node:refstats
1313 - def refstats(self, **kw):
1314 """ 1315 Gets node reference and possibly node statistics. 1316 1317 Keywords: 1318 - async - whether to do this call asynchronously, and 1319 return a JobTicket object 1320 - callback - if given, this should be a callable which accepts 2 1321 arguments: 1322 - status - will be one of 'successful', 'failed' or 'pending' 1323 - value - depends on status: 1324 - if status is 'successful', this will contain the value 1325 returned from the command 1326 - if status is 'failed' or 'pending', this will contain 1327 a dict containing the response from node 1328 - GiveOpennetRef - default False - if True, return the node's Opennet reference rather than the node's Darknet reference 1329 - WithPrivate - default False - if True, includes the node's private node reference fields 1330 - WithVolatile - default False - if True, returns a node's volatile info 1331 """ 1332 1333 return self._submitCmd("__global", "GetNode", **kw)
1334 1335 #@-node:refstats 1336 #@+node:testDDA
1337 - def testDDA(self, **kw):
1338 """ 1339 Test for Direct Disk Access capability on a directory (can the node and the FCP client both access the same directory?) 1340 1341 Keywords: 1342 - async - whether to do this call asynchronously, and 1343 return a JobTicket object 1344 - callback - if given, this should be a callable which accepts 2 1345 arguments: 1346 - status - will be one of 'successful', 'failed' or 'pending' 1347 - value - depends on status: 1348 - if status is 'successful', this will contain the value 1349 returned from the command 1350 - if status is 'failed' or 'pending', this will contain 1351 a dict containing the response from node 1352 - Directory - directory to test 1353 - WithReadDirectory - default False - if True, want node to read from directory for a put operation 1354 - WithWriteDirectory - default False - if True, want node to write to directory for a get operation 1355 """ 1356 # cache the testDDA: 1357 DDAkey = (kw["Directory"], kw["WithReadDirectory"], kw["WithWriteDirectory"]) 1358 try: 1359 return self.testedDDA[DDAkey] 1360 except KeyError: 1361 pass # we actually have to test this dir. 1362 requestResult = self._submitCmd("__global", "TestDDARequest", **kw) 1363 writeFilename = None; 1364 kw = {}; 1365 kw[ 'Directory' ] = requestResult[ 'Directory' ]; 1366 if( requestResult.has_key( 'ReadFilename' )): 1367 readFilename = requestResult[ 'ReadFilename' ]; 1368 readFile = open( readFilename, 'rb' ); 1369 readFileContents = readFile.read(); 1370 readFile.close(); 1371 kw[ 'ReadFilename' ] = readFilename; 1372 kw[ 'ReadContent' ] = readFileContents; 1373 1374 if( requestResult.has_key( 'WriteFilename' ) and requestResult.has_key( 'ContentToWrite' )): 1375 writeFilename = requestResult[ 'WriteFilename' ]; 1376 contentToWrite = requestResult[ 'ContentToWrite' ]; 1377 writeFile = open( writeFilename, 'w+b' ); 1378 writeFileContents = writeFile.write( contentToWrite ); 1379 writeFile.close(); 1380 writeFileStatObject = os.stat( writeFilename ); 1381 writeFileMode = writeFileStatObject.st_mode; 1382 os.chmod( writeFilename, writeFileMode | stat.S_IREAD | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ); 1383 1384 responseResult = self._submitCmd("__global", "TestDDAResponse", **kw) 1385 if( None != writeFilename ): 1386 try: 1387 os.remove( writeFilename ); 1388 except OSError, msg: 1389 pass; 1390 # cache this result, so we do not calculate it twice. 1391 self.testedDDA[DDAkey] = responseResult 1392 return responseResult;
1393 1394 #@-node:testDDA 1395 #@+node:addpeer
1396 - def addpeer(self, **kw):
1397 """ 1398 Add a peer to the node 1399 1400 Keywords: 1401 - async - whether to do this call asynchronously, and 1402 return a JobTicket object 1403 - callback - if given, this should be a callable which accepts 2 1404 arguments: 1405 - status - will be one of 'successful', 'failed' or 'pending' 1406 - value - depends on status: 1407 - if status is 'successful', this will contain the value 1408 returned from the command 1409 - if status is 'failed' or 'pending', this will contain 1410 a dict containing the response from node 1411 - File - filepath of a file containing a noderef in the node's directory 1412 - URL - URL of a copy of a peer's noderef to add 1413 - kwdict - If neither File nor URL are provided, the fields of a noderef can be passed in the form of a Python dictionary using the kwdict keyword 1414 """ 1415 1416 return self._submitCmd("__global", "AddPeer", **kw)
1417 1418 #@-node:addpeer 1419 #@+node:listpeer
1420 - def listpeer(self, **kw):
1421 """ 1422 Modify settings on one of the node's peers 1423 1424 Keywords: 1425 - async - whether to do this call asynchronously, and 1426 return a JobTicket object 1427 - callback - if given, this should be a callable which accepts 2 1428 arguments: 1429 - status - will be one of 'successful', 'failed' or 'pending' 1430 - value - depends on status: 1431 - if status is 'successful', this will contain the value 1432 returned from the command 1433 - if status is 'failed' or 'pending', this will contain 1434 a dict containing the response from node 1435 - NodeIdentifier - one of name (except for opennet peers), identity or IP:port for the desired peer 1436 """ 1437 1438 return self._submitCmd("__global", "ListPeer", **kw)
1439 1440 #@-node:listpeer 1441 #@+node:modifypeer
1442 - def modifypeer(self, **kw):
1443 """ 1444 Modify settings on one of the node's peers 1445 1446 Keywords: 1447 - async - whether to do this call asynchronously, and 1448 return a JobTicket object 1449 - callback - if given, this should be a callable which accepts 2 1450 arguments: 1451 - status - will be one of 'successful', 'failed' or 'pending' 1452 - value - depends on status: 1453 - if status is 'successful', this will contain the value 1454 returned from the command 1455 - if status is 'failed' or 'pending', this will contain 1456 a dict containing the response from node 1457 - IsDisabled - default False - enables or disabled the peer accordingly 1458 - IsListenOnly - default False - sets ListenOnly on the peer 1459 - NodeIdentifier - one of name, identity or IP:port for the desired peer 1460 """ 1461 1462 return self._submitCmd("__global", "ModifyPeer", **kw)
1463 1464 #@-node:modifypeer 1465 #@+node:modifypeernote
1466 - def modifypeernote(self, **kw):
1467 """ 1468 Modify settings on one of the node's peers 1469 1470 Keywords: 1471 - async - whether to do this call asynchronously, and 1472 return a JobTicket object 1473 - callback - if given, this should be a callable which accepts 2 1474 arguments: 1475 - status - will be one of 'successful', 'failed' or 'pending' 1476 - value - depends on status: 1477 - if status is 'successful', this will contain the value 1478 returned from the command 1479 - if status is 'failed' or 'pending', this will contain 1480 a dict containing the response from node 1481 - NodeIdentifier - one of name, identity or IP:port for the desired peer 1482 - NoteText - base64 encoded string of the desired peer note text 1483 - PeerNoteType - code number of peer note type: currently only private peer note is supported by the node with code number 1 1484 """ 1485 1486 return self._submitCmd("__global", "ModifyPeerNote", **kw)
1487 1488 #@-node:modifypeernote 1489 #@+node:removepeer
1490 - def removepeer(self, **kw):
1491 """ 1492 Removes a peer from the node 1493 1494 Keywords: 1495 - async - whether to do this call asynchronously, and 1496 return a JobTicket object 1497 - callback - if given, this should be a callable which accepts 2 1498 arguments: 1499 - status - will be one of 'successful', 'failed' or 'pending' 1500 - value - depends on status: 1501 - if status is 'successful', this will contain the value 1502 returned from the command 1503 - if status is 'failed' or 'pending', this will contain 1504 a dict containing the response from node 1505 - NodeIdentifier - one of name, identity or IP:port for the desired peer 1506 """ 1507 1508 return self._submitCmd("__global", "RemovePeer", **kw)
1509 1510 #@-node:removepeer 1511 #@-others 1512 1513 #@-node:FCP Primitives 1514 #@+node:Namesite primitives 1515 # methods for namesites 1516 1517 #@+others 1518 #@+node:namesiteInit
1519 - def namesiteInit(self, path):
1520 """ 1521 Initialise the namesites layer and load our namesites list 1522 """ 1523 if path: 1524 self.namesiteFile = path 1525 else: 1526 self.namesiteFile = os.path.join(os.path.expanduser("~"), ".freenames") 1527 1528 self.namesiteLocals = [] 1529 self.namesitePeers = [] 1530 1531 # create empty file 1532 if os.path.isfile(self.namesiteFile): 1533 self.namesiteLoad() 1534 else: 1535 self.namesiteSave()
1536 1537 #@-node:namesiteInit 1538 #@+node:namesiteLoad
1539 - def namesiteLoad(self):
1540 """ 1541 """ 1542 try: 1543 parser = pseudopythonparser.Parser() 1544 env = parser.parse(file(self.namesiteFile).read()) 1545 self.namesiteLocals = env['locals'] 1546 self.namesitePeers = env['peers'] 1547 except: 1548 traceback.print_exc() 1549 env = {}
1550 1551 #@-node:namesiteLoad 1552 #@+node:namesiteSave
1553 - def namesiteSave(self):
1554 """ 1555 Save the namesites list 1556 """ 1557 f = file(self.namesiteFile, "w") 1558 1559 f.write("# pyFreenet namesites registration file\n\n") 1560 1561 pp = pprint.PrettyPrinter(width=72, indent=2, stream=f) 1562 1563 f.write("locals = ") 1564 pp.pprint(self.namesiteLocals) 1565 f.write("\n") 1566 1567 f.write("peers = ") 1568 pp.pprint(self.namesitePeers) 1569 f.write("\n") 1570 1571 f.close()
1572 1573 #@-node:namesiteSave 1574 #@+node:namesiteAddLocal
1575 - def namesiteAddLocal(self, name, privuri=None):
1576 """ 1577 Create a new nameservice that we own 1578 """ 1579 if not privuri: 1580 privuri = self.genkey()[1] 1581 puburi = self.invertprivate(privuri) 1582 1583 privuri = self.namesiteProcessUri(privuri) 1584 puburi = self.namesiteProcessUri(puburi) 1585 1586 for rec in self.namesiteLocals: 1587 if rec['name'] == name: 1588 raise Exception("Already got a local service called '%s'" % name) 1589 1590 self.namesiteLocals.append( 1591 {'name':name, 1592 'privuri':privuri, 1593 'puburi': puburi, 1594 'cache': {}, 1595 }) 1596 1597 self.namesiteSave()
1598 1599 #@-node:namesiteAddLocal 1600 #@+node:namesiteDelLocal
1601 - def namesiteDelLocal(self, name):
1602 """ 1603 Delete a local nameservice 1604 """ 1605 rec = None 1606 for r in self.namesiteLocals: 1607 if r['name'] == name: 1608 self.namesiteLocals.remove(r) 1609 1610 self.namesiteSave()
1611 1612 #@-node:namesiteDelLocal 1613 #@+node:namesiteAddRecord
1614 - def namesiteAddRecord(self, localname, domain, uri):
1615 """ 1616 Adds a (domainname -> uri) record to one of our local 1617 services 1618 """ 1619 rec = None 1620 for r in self.namesiteLocals: 1621 if r['name'] == localname: 1622 rec = r 1623 if not rec: 1624 raise Exception("No local service '%s'" % localname) 1625 1626 cache = rec['cache'] 1627 1628 # bail if domain is known and is pointing to same uri 1629 if cache.get(domain, None) == uri: 1630 return 1631 1632 # domain is new, or uri has changed 1633 cache[domain] = uri 1634 1635 # save local records 1636 self.namesiteSave() 1637 1638 # determine the insert uri 1639 localPrivUri = rec['privuri'] + "/" + domain + "/0" 1640 1641 # and stick it in, via global queue 1642 id = "namesite|%s|%s|%s" % (localname, domain, int(time.time())) 1643 self.put( 1644 localPrivUri, 1645 id=id, 1646 data=uri, 1647 persistence="forever", 1648 Global=True, 1649 priority=0, 1650 async=True, 1651 ) 1652 1653 self.refreshPersistentRequests()
1654 1655 #@-node:namesiteAddRecord 1656 #@+node:namesiteDelRecord
1657 - def namesiteDelRecord(self, localname, domain):
1658 """ 1659 Removes a domainname record from one of our local 1660 services 1661 """ 1662 rec = None 1663 for r in self.namesiteLocals: 1664 if r['name'] == localname: 1665 if domain in r['cache']: 1666 del r['cache'][domain] 1667 1668 self.namesiteSave()
1669 1670 #@-node:namesiteDelRecord 1671 #@+node:namesiteAddPeer
1672 - def namesiteAddPeer(self, name, uri):
1673 """ 1674 Adds a namesite to our list 1675 """ 1676 # process URI 1677 uri = uri.split("freenet:")[-1] 1678 1679 # validate uri TODO reject private uris 1680 if not uri.startswith("USK"): 1681 raise Exception("Invalid URI %s, should be a public USK" % uri) 1682 1683 # just uplift the public key part, remove path 1684 uri = uri.split("freenet:")[-1] 1685 uri = uri.split("/")[0] 1686 1687 if self.namesiteHasPeer(name): 1688 raise Exception("Peer nameservice '%s' already exists" % name) 1689 1690 self.namesitePeers.append({'name':name, 'puburi':uri}) 1691 1692 self.namesiteSave()
1693 1694 #@-node:namesiteAddPeer 1695 #@+node:namesiteHasPeer
1696 - def namesiteHasPeer(self, name):
1697 """ 1698 returns True if we have a peer namesite of given name 1699 """ 1700 return self.namesiteGetPeer(name) is not None
1701 1702 #@-node:namesiteHasPeer 1703 #@+node:namesiteGetPeer
1704 - def namesiteGetPeer(self, name):
1705 """ 1706 returns record for given peer 1707 """ 1708 for rec in self.namesitePeers: 1709 if rec['name'] == name: 1710 return rec 1711 return None
1712 1713 #@-node:namesiteGetPeer 1714 #@+node:namesiteRemovePeer
1715 - def namesiteRemovePeer(self, name):
1716 """ 1717 Removes a namesite from our list 1718 """ 1719 for rec in self.namesitePeers: 1720 if rec['name'] == name: 1721 self.namesitePeers.remove(rec) 1722 1723 self.namesiteSave()
1724 1725 #@-node:namesiteRemovePeer 1726 #@+node:namesiteLookup
1727 - def namesiteLookup(self, domain, **kw):
1728 """ 1729 Attempts a lookup of a given 'domain name' on our designated 1730 namesites 1731 1732 Arguments: 1733 - domain - the domain to look up 1734 1735 Keywords: 1736 - localonly - whether to only search local cache 1737 - peer - if given, search only that peer's namesite (not locals) 1738 """ 1739 self.namesiteLoad() 1740 1741 localonly = kw.get('localonly', False) 1742 peer = kw.get('peer', None) 1743 1744 if not peer: 1745 # try local cache first 1746 for rec in self.namesiteLocals: 1747 if domain in rec['cache']: 1748 return rec['cache'][domain] 1749 1750 if localonly: 1751 return None 1752 1753 # the long step 1754 for rec in self.namesitePeers: 1755 1756 if peer and (peer != rec['name']): 1757 continue 1758 1759 uri = rec['puburi'] + "/" + domain + "/0" 1760 1761 try: 1762 mimetype, tgtUri = self.get(uri) 1763 return tgtUri 1764 except: 1765 pass 1766 1767 return None
1768 1769 #@-node:namesiteLookup 1770 #@+node:namesiteProcessUri
1771 - def namesiteProcessUri(self, uri):
1772 """ 1773 Reduces a URI 1774 """ 1775 # strip 'freenet:' 1776 uri1 = uri.split("freenet:")[-1] 1777 1778 # change SSK to USK, and split path 1779 uri1 = uri1.replace("SSK@", "USK@").split("/")[0] 1780 1781 # barf if bad uri 1782 if not uri1.startswith("USK@"): 1783 usage("Bad uri %s" % uri) 1784 1785 return uri1
1786 1787 #@-node:namesiteProcessUri 1788 #@-others 1789 1790 #@-node:Namesite primitives 1791 #@+node:Other High Level Methods 1792 # high level client methods 1793 1794 #@+others 1795 #@+node:listenGlobal
1796 - def listenGlobal(self, **kw):
1797 """ 1798 Enable listening on global queue 1799 """ 1800 self._submitCmd(None, "WatchGlobal", Enabled="true", **kw)
1801 1802 #@-node:listenGlobal 1803 #@+node:ignoreGlobal
1804 - def ignoreGlobal(self, **kw):
1805 """ 1806 Stop listening on global queue 1807 """ 1808 self._submitCmd(None, "WatchGlobal", Enabled="false", **kw)
1809 1810 #@-node:ignoreGlobal 1811 #@+node:purgePersistentJobs
1812 - def purgePersistentJobs(self):
1813 """ 1814 Cancels all persistent jobs in one go 1815 """ 1816 for job in self.getPersistentJobs(): 1817 job.cancel()
1818 1819 #@-node:purgePersistentJobs 1820 #@+node:getAllJobs
1821 - def getAllJobs(self):
1822 """ 1823 Returns a list of persistent jobs, excluding global jobs 1824 """ 1825 return self.jobs.values()
1826 1827 #@-node:getAllJobs 1828 #@+node:getPersistentJobs
1829 - def getPersistentJobs(self):
1830 """ 1831 Returns a list of persistent jobs, excluding global jobs 1832 """ 1833 return [j for j in self.jobs.values() if j.isPersistent and not j.isGlobal]
1834 1835 #@-node:getPersistentJobs 1836 #@+node:getGlobalJobs
1837 - def getGlobalJobs(self):
1838 """ 1839 Returns a list of global jobs 1840 """ 1841 return [j for j in self.jobs.values() if j.isGlobal]
1842 1843 #@-node:getGlobalJobs 1844 #@+node:getTransientJobs
1845 - def getTransientJobs(self):
1846 """ 1847 Returns a list of non-persistent, non-global jobs 1848 """ 1849 return [j for j in self.jobs.values() if not j.isPersistent]
1850 1851 #@-node:getTransientJobs 1852 #@+node:refreshPersistentRequests
1853 - def refreshPersistentRequests(self, **kw):
1854 """ 1855 Sends a ListPersistentRequests to node, to ensure that 1856 our records of persistent requests are up to date. 1857 1858 Since, upon connection, the node sends us a list of all 1859 outstanding persistent requests anyway, I can't really 1860 see much use for this method. I've only added the method 1861 for FCP spec compliance 1862 """ 1863 self._log(DETAIL, "listPersistentRequests") 1864 1865 if self.jobs.has_key('__global'): 1866 raise Exception("An existing non-identifier job is currently pending") 1867 1868 # --------------------------------- 1869 # format the request 1870 opts = {} 1871 1872 id = '__global' 1873 opts['Identifier'] = id 1874 1875 opts['async'] = kw.pop('async', False) 1876 if kw.has_key('callback'): 1877 opts['callback'] = kw['callback'] 1878 1879 # --------------------------------- 1880 # now enqueue the request 1881 return self._submitCmd(id, "ListPersistentRequests", **opts)
1882 1883 #@-node:refreshPersistentRequests 1884 #@+node:clearGlobalJob
1885 - def clearGlobalJob(self, id):
1886 """ 1887 Removes a job from the jobs queue 1888 """ 1889 self._submitCmd(id, "RemovePersistentRequest", 1890 Identifier=id, Global=True, async=True, waituntilsent=True)
1891 1892 #@-node:clearGlobalJob 1893 #@+node:setSocketTimeout
1894 - def getSocketTimeout(self):
1895 """ 1896 Gets the socketTimeout for future socket calls; 1897 returns None if not supported by Python version 1898 """ 1899 try: 1900 return self.socket.gettimeout() 1901 except Exception, e: 1902 # Socket timeout setting is not available until Python 2.3, so ignore exceptions 1903 pass 1904 return None
1905 1906 #@-node:setSocketTimeout 1907 #@+node:setSocketTimeout
1908 - def setSocketTimeout(self, socketTimeout):
1909 """ 1910 Sets the socketTimeout for future socket calls 1911 1912 >>> node = FCPNode() 1913 >>> timeout = node.getSocketTimeout() 1914 >>> newtimeout = 1800 1915 >>> node.setSocketTimeout(newtimeout) 1916 >>> node.getSocketTimeout() 1917 1800.0 1918 """ 1919 self.socketTimeout = socketTimeout 1920 try: 1921 self.socket.settimeout(self.socketTimeout) 1922 except Exception, e: 1923 # Socket timeout setting is not available until Python 2.3, so ignore exceptions 1924 pass
1925 1926 #@-node:setSocketTimeout 1927 #@+node:getVerbosity
1928 - def getVerbosity(self):
1929 """ 1930 Gets the verbosity for future logging calls 1931 1932 >>> node = FCPNode() 1933 >>> node.getVerbosity() # default 1934 3 1935 >>> node.setVerbosity(INFO) 1936 >>> node.getVerbosity() 1937 4 1938 """ 1939 return self.verbosity
1940 1941 #@-node:getVerbosity 1942 #@+node:setVerbosity
1943 - def setVerbosity(self, verbosity):
1944 """ 1945 Sets the verbosity for future logging calls 1946 """ 1947 self.verbosity = verbosity
1948 1949 #@-node:setVerbosity 1950 #@+node:shutdown
1951 - def shutdown(self):
1952 """ 1953 Terminates the manager thread 1954 1955 You should explicitly shutdown any existing nodes 1956 before exiting your client application 1957 """ 1958 log = self._log 1959 1960 log(DETAIL, "shutdown: entered") 1961 if not self.running: 1962 log(DETAIL, "shutdown: already shut down") 1963 return 1964 1965 self.running = False 1966 1967 # give the manager thread a chance to bail out 1968 time.sleep(pollTimeout * 3) 1969 1970 # wait for mgr thread to quit 1971 log(DETAIL, "shutdown: waiting for manager thread to terminate") 1972 self.shutdownLock.acquire() 1973 log(DETAIL, "shutdown: manager thread terminated") 1974 1975 # shut down FCP connection 1976 if hasattr(self, 'socket'): 1977 if not self.noCloseSocket: 1978 self.socket.close() 1979 del self.socket 1980 1981 # and close the logfile 1982 if None != self.logfile and self.logfile not in [sys.stdout, sys.stderr]: 1983 self.logfile.close() 1984 1985 log(DETAIL, "shutdown: done?")
1986 1987 #@-node:shutdown 1988 #@-others 1989 1990 1991 1992 #@-node:Other High Level Methods 1993 #@+node:Manager Thread 1994 # methods for manager thread 1995 1996 #@+others 1997 #@+node:_mgrThread
1998 - def _mgrThread(self):
1999 """ 2000 This thread is the nucleus of pyFreenet, and coordinates incoming 2001 client commands and incoming node responses 2002 """ 2003 log = self._log 2004 2005 self.shutdownLock.acquire() 2006 2007 log(DETAIL, "FCPNode: manager thread starting") 2008 try: 2009 while self.running: 2010 2011 log(NOISY, "_mgrThread: Top of manager thread") 2012 2013 # try for incoming messages from node 2014 log(NOISY, "_mgrThread: Testing for incoming message") 2015 if self._msgIncoming(): 2016 log(DEBUG, "_mgrThread: Retrieving incoming message") 2017 msg = self._rxMsg() 2018 log(DEBUG, "_mgrThread: Got incoming message, dispatching") 2019 self._on_rxMsg(msg) 2020 log(DEBUG, "_mgrThread: back from on_rxMsg") 2021 else: 2022 log(NOISY, "_mgrThread: No incoming message from node") 2023 2024 # try for incoming requests from clients 2025 log(NOISY, "_mgrThread: Testing for client req") 2026 try: 2027 req = self.clientReqQueue.get(True, pollTimeout) 2028 log(DEBUG, "_mgrThread: Got client req, dispatching") 2029 self._on_clientReq(req) 2030 log(DEBUG, "_mgrThread: Back from on_clientReq") 2031 except Queue.Empty: 2032 log(NOISY, "_mgrThread: No incoming client req") 2033 pass 2034 2035 self._log(DETAIL, "_mgrThread: Manager thread terminated normally") 2036 2037 except Exception, e: 2038 traceback.print_exc() 2039 self._log(CRITICAL, "_mgrThread: manager thread crashed") 2040 2041 # send the exception to all waiting jobs 2042 for id, job in self.jobs.items(): 2043 job._putResult(e) 2044 2045 # send the exception to all queued jobs 2046 while True: 2047 try: 2048 job = self.clientReqQueue.get(True, pollTimeout) 2049 job._putResult(e) 2050 except Queue.Empty: 2051 log(NOISY, "_mgrThread: No incoming client req") 2052 break 2053 2054 self.shutdownLock.release()
2055 2056 #@-node:_mgrThread 2057 #@+node:_msgIncoming
2058 - def _msgIncoming(self):
2059 """ 2060 Returns True if a message is coming in from the node 2061 """ 2062 return len(select.select([self.socket], [], [], pollTimeout)[0]) > 0
2063 2064 #@-node:_msgIncoming 2065 #@+node:_submitCmd
2066 - def _submitCmd(self, id, cmd, **kw):
2067 """ 2068 Submits a command for execution 2069 2070 Arguments: 2071 - id - the command identifier 2072 - cmd - the command name, such as 'ClientPut' 2073 2074 Keywords: 2075 - async - whether to return a JobTicket object, rather than 2076 the command result 2077 - callback - a function taking 2 args 'status' and 'value'. 2078 Status is one of 'successful', 'pending' or 'failed'. 2079 value is the primitive return value if successful, or the raw 2080 node message if pending or failed 2081 - followRedirect - follow a redirect if true, otherwise fail the get 2082 - rawcmd - a raw command buffer to send directly 2083 - options specific to command such as 'URI' 2084 - timeout - timeout in seconds for job completion, default 1 year 2085 - waituntilsent - whether to block until this command has been sent 2086 to the node, default False 2087 - keep - whether to keep the job on our jobs list after it completes, 2088 default False 2089 2090 Returns: 2091 - if command is sent in sync mode, returns the result 2092 - if command is sent in async mode, returns a JobTicket 2093 object which the client can poll or block on later 2094 2095 >>> import fcp 2096 >>> n = fcp.node.FCPNode() 2097 >>> cmd = "ClientPut" 2098 >>> jobid = "id2291160822224650" 2099 >>> opts = {'Metadata.ContentType': 'text/html', 'async': False, 'UploadFrom': 'direct', 'Verbosity': 0, 'Global': 'false', 'URI': 'CHK@', 'keep': False, 'DontCompress': 'false', 'MaxRetries': -1, 'timeout': 31536000, 'Codecs': 'GZIP, BZIP2, LZMA, LZMA_NEW', 'GetCHKOnly': 'true', 'RealTimeFlag': 'false', 'waituntilsent': False, 'Identifier': jobid, 'Data': '<!DOCTYPE html>\\n<html>\\n<head>\\n<title>Sitemap for freenet-plugin-bare</title>\\n</head>\\n<body>\\n<h1>Sitemap for freenet-plugin-bare</h1>\\nThis listing was automatically generated and inserted by freesitemgr\\n<br><br>\\n<table cellspacing=0 cellpadding=2 border=0>\\n<tr>\\n<td><b>Size</b></td>\\n<td><b>Mimetype</b></td>\\n<td><b>Name</b></td>\\n</tr>\\n<tr>\\n<td>19211</td>\\n<td>text/html</td>\\n<td><a href="index.html">index.html</a></td>\\n</tr>\\n</table>\\n<h2>Keys of large, separately inserted files</h2>\\n<pre>\\n</pre></body></html>\\n', 'PriorityClass': 3, 'Persistence': 'connection', 'TargetFilename': 'sitemap.html'} 2100 >>> n._submitCmd(jobid, cmd, **opts) 2101 'CHK@FR~anQPhpw7lZjxl96o1b875tem~5xExPTiSa6K3Wus,yuGOWhpqFY5N9i~N4BjM0Oh6Bk~Kkb7sE4l8GAsdBEs,AAMC--8/sitemap.html' 2102 >>> # n._submitCmd(id=None, cmd='WatchGlobal', **{'Enabled': 'true'}) 2103 2104 """ 2105 if not self.nodeIsAlive: 2106 raise FCPNodeFailure("%s:%s: node closed connection" % (cmd, id)) 2107 2108 log = self._log 2109 2110 log(DEBUG, "_submitCmd: id=" + repr(id) + ", cmd=" + repr(cmd) + ", **" + repr(kw)) 2111 2112 async = kw.pop('async', False) 2113 followRedirect = kw.pop('followRedirect', True) 2114 stream = kw.pop('stream', None) 2115 waituntilsent = kw.pop('waituntilsent', False) 2116 keepjob = kw.pop('keep', False) 2117 timeout = kw.pop('timeout', ONE_YEAR) 2118 if( kw.has_key( "kwdict" )): 2119 kwdict = kw[ "kwdict" ] 2120 del kw[ "kwdict" ] 2121 for key in kwdict.keys(): 2122 kw[ key ] = kwdict[ key ] 2123 job = JobTicket( 2124 self, id, cmd, kw, 2125 verbosity=self.verbosity, logger=self._log, keep=keepjob, 2126 stream=stream) 2127 2128 log(DEBUG, "_submitCmd: timeout=%s" % timeout) 2129 2130 job.followRedirect = followRedirect 2131 2132 if cmd == 'ClientGet' and 'URI' in kw: 2133 job.uri = kw['URI'] 2134 2135 if cmd == 'ClientPut': 2136 job.mimetype = kw['Metadata.ContentType'] 2137 2138 self.clientReqQueue.put(job) 2139 2140 # log(DEBUG, "_submitCmd: id='%s' cmd='%s' kw=%s" % (id, cmd, # truncate long commands 2141 # str([(k,str(kw.get(k, ""))[:128]) 2142 # for k 2143 # in kw]))) 2144 2145 2146 if async: 2147 if waituntilsent: 2148 job.waitTillReqSent() 2149 return job 2150 elif cmd in ['WatchGlobal', "RemovePersistentRequest"]: 2151 return 2152 else: 2153 log(DETAIL, "Waiting on job") 2154 return job.wait(timeout)
2155 2156 #@-node:_submitCmd 2157 #@+node:_on_clientReq
2158 - def _on_clientReq(self, job):
2159 """ 2160 takes an incoming request job from client and transmits it to 2161 the fcp port, and also registers it so the manager thread 2162 can action responses from the fcp port. 2163 """ 2164 id = job.id 2165 cmd = job.cmd 2166 kw = job.kw 2167 2168 # register the req 2169 if cmd != 'WatchGlobal': 2170 self.jobs[id] = job 2171 self._log(DEBUG, "_on_clientReq: cmd=%s id=%s lock=%s" % ( 2172 cmd, repr(id), job.lock)) 2173 2174 # now can send, since we're the only one who will 2175 self._txMsg(cmd, **kw) 2176 2177 job.timeQueued = int(time.time()) 2178 2179 job.reqSentLock.release()
2180 2181 #@-node:_on_clientReq 2182 #@+node:_on_rxMsg
2183 - def _on_rxMsg(self, msg):
2184 """ 2185 Handles incoming messages from node 2186 2187 If an incoming message represents the termination of a command, 2188 the job ticket object will be notified accordingly 2189 """ 2190 log = self._log 2191 2192 # find the job this relates to 2193 id = msg.get('Identifier', '__global') 2194 2195 hdr = msg['header'] 2196 2197 job = self.jobs.get(id, None) 2198 if not job: 2199 # we have a global job and/or persistent job from last connection 2200 log(DETAIL, "***** Got %s from unknown job id %s" % (hdr, repr(id))) 2201 job = JobTicket(self, id, hdr, msg) 2202 self.jobs[id] = job 2203 2204 # action from here depends on what kind of message we got 2205 2206 # ----------------------------- 2207 # handle GenerateSSK responses 2208 2209 if hdr == 'SSKKeypair': 2210 # got requested keys back 2211 keys = (msg['RequestURI'], msg['InsertURI']) 2212 job.callback('successful', keys) 2213 job._putResult(keys) 2214 2215 # and remove job from queue 2216 self.jobs.pop(id, None) 2217 return 2218 2219 # ----------------------------- 2220 # handle ClientGet responses 2221 2222 if hdr == 'DataFound': 2223 if( job.kw.has_key( 'URI' )): 2224 log(INFO, "Got DataFound for URI=%s" % job.kw['URI']) 2225 else: 2226 log(ERROR, "Got DataFound without URI") 2227 mimetype = msg['Metadata.ContentType'] 2228 if job.kw.has_key('Filename'): 2229 # already stored to disk, done 2230 #resp['file'] = file 2231 result = (mimetype, job.kw['Filename'], msg) 2232 job.callback('successful', result) 2233 job._putResult(result) 2234 return 2235 2236 elif job.kw['ReturnType'] == 'none': 2237 result = (mimetype, 1, msg) 2238 job.callback('successful', result) 2239 job._putResult(result) 2240 return 2241 2242 # otherwise, we're expecting an AllData and will react to it then 2243 else: 2244 # is this a persistent get? 2245 if job.kw['ReturnType'] == 'direct' \ 2246 and job.kw.get('Persistence', None) != 'connection': 2247 # gotta poll for request status so we can get our data 2248 # FIXME: this is a hack, clean it up 2249 log(INFO, "Request was persistent") 2250 if not hasattr(job, "gotPersistentDataFound"): 2251 if job.isGlobal: 2252 isGlobal = "true" 2253 else: 2254 isGlobal = "false" 2255 job.gotPersistentDataFound = True 2256 log(INFO, " --> sending GetRequestStatus") 2257 self._txMsg("GetRequestStatus", 2258 Identifier=job.kw['Identifier'], 2259 Persistence=msg.get("Persistence", "connection"), 2260 Global=isGlobal, 2261 ) 2262 2263 job.callback('pending', msg) 2264 job.mimetype = mimetype 2265 return 2266 2267 if hdr == 'CompatibilityMode': 2268 # information, how to insert the file to make it an exact match. 2269 # TODO: Use the information. 2270 job.callback('pending', msg) 2271 return 2272 2273 if hdr == 'ExpectedMIME': 2274 # information, how to insert the file to make it an exact match. 2275 # TODO: Use the information. 2276 mimetype = msg['Metadata.ContentType'] 2277 job.mimetype = mimetype 2278 job.callback('pending', msg) 2279 return 2280 2281 if hdr == 'ExpectedDataLength': 2282 # The expected filesize. 2283 # TODO: Use the information. 2284 size = msg['DataLength'] 2285 job.callback('pending', msg) 2286 return 2287 2288 if hdr == 'AllData': 2289 result = (job.mimetype, msg['Data'], msg) 2290 job.callback('successful', result) 2291 job._putResult(result) 2292 return 2293 2294 if hdr == 'GetFailed': 2295 # see if it's just a redirect problem 2296 if job.followRedirect and msg.get('ShortCodeDescription', None) == "New URI": 2297 uri = msg['RedirectURI'] 2298 job.kw['URI'] = uri 2299 job.kw['id'] = self._getUniqueId(); 2300 self._txMsg(job.cmd, **job.kw) 2301 log(DETAIL, "Redirect to %s" % uri) 2302 return 2303 # see if it's just a TOO_MANY_PATH_COMPONENTS redirect 2304 if job.followRedirect and msg.get('ShortCodeDescription', None) == "Too many path components": 2305 uri = msg['RedirectURI'] 2306 job.kw['URI'] = uri 2307 job.kw['id'] = self._getUniqueId(); 2308 self._txMsg(job.cmd, **job.kw) 2309 log(DETAIL, "Redirect to %s" % uri) 2310 return 2311 2312 # return an exception 2313 job.callback("failed", msg) 2314 job._putResult(FCPGetFailed(msg)) 2315 return 2316 2317 # ----------------------------- 2318 # handle ClientPut responses 2319 2320 if hdr == 'URIGenerated': 2321 if 'URI' not in msg: 2322 log(ERROR, "message {} without 'URI'. This is very likely a bug in Freenet. Check whether you have files in uploads or downloads without URI (clickable link).".format(hdr)) 2323 else: 2324 job.uri = msg['URI'] 2325 newUri = msg['URI'] 2326 job.callback('pending', msg) 2327 2328 return 2329 2330 # bail here if no data coming back 2331 if job.kw.get('GetCHKOnly', False) == 'true': 2332 # done - only wanted a CHK 2333 job._putResult(newUri) 2334 return 2335 2336 if hdr == 'PutSuccessful': 2337 if 'URI' not in msg: 2338 log(ERROR, "message {} without 'URI'. This is very likely a bug in Freenet. Check whether you have files in uploads or downloads without URI (clickable link).".format(hdr)) 2339 else: 2340 result = msg['URI'] 2341 job._putResult(result) 2342 job.callback('successful', result) 2343 # print "*** PUTSUCCESSFUL" 2344 return 2345 2346 if hdr == 'PutFailed': 2347 job.callback('failed', msg) 2348 job._putResult(FCPPutFailed(msg)) 2349 return 2350 2351 if hdr == 'PutFetchable': 2352 if 'URI' not in msg: 2353 log(ERROR, "message {} without 'URI'. This is very likely a bug in Freenet. Check whether you have files in uploads or downloads without URI (clickable link).".format(hdr)) 2354 else: 2355 uri = msg['URI'] 2356 job.kw['URI'] = uri 2357 job.callback('pending', msg) 2358 return 2359 2360 # ----------------------------- 2361 # handle ConfigData 2362 if hdr == 'ConfigData': 2363 # return all the data recieved 2364 job.callback('successful', msg) 2365 job._putResult(msg) 2366 2367 # remove job from queue 2368 self.jobs.pop(id, None) 2369 return 2370 2371 # ----------------------------- 2372 # handle progress messages 2373 2374 if hdr == 'StartedCompression': 2375 job.callback('pending', msg) 2376 return 2377 2378 if hdr == 'FinishedCompression': 2379 job.callback('pending', msg) 2380 return 2381 2382 if hdr == 'SimpleProgress': 2383 job.callback('pending', msg) 2384 return 2385 2386 if hdr == 'SendingToNetwork': 2387 job.callback('pending', msg) 2388 return 2389 2390 if hdr == 'ExpectedHashes': 2391 # The hashes the file must have. 2392 # TODO: Use the information. 2393 sha256 = msg['Hashes.SHA256'] 2394 job.callback('pending', msg) 2395 return 2396 2397 2398 # ----------------------------- 2399 # handle FCPPluginMessage replies 2400 2401 if hdr == 'FCPPluginReply': 2402 job._appendMsg(msg) 2403 job.callback('successful', job.msgs) 2404 job._putResult(job.msgs) 2405 return 2406 2407 # ----------------------------- 2408 # handle peer management messages 2409 2410 if hdr == 'EndListPeers': 2411 job._appendMsg(msg) 2412 job.callback('successful', job.msgs) 2413 job._putResult(job.msgs) 2414 return 2415 2416 if hdr == 'Peer': 2417 if(job.cmd == "ListPeers"): 2418 job.callback('pending', msg) 2419 job._appendMsg(msg) 2420 else: 2421 job.callback('successful', msg) 2422 job._putResult(msg) 2423 return 2424 2425 if hdr == 'PeerRemoved': 2426 job._appendMsg(msg) 2427 job.callback('successful', job.msgs) 2428 job._putResult(job.msgs) 2429 return 2430 2431 if hdr == 'UnknownNodeIdentifier': 2432 job._appendMsg(msg) 2433 job.callback('failed', job.msgs) 2434 job._putResult(job.msgs) 2435 return 2436 2437 # ----------------------------- 2438 # handle peer note management messages 2439 2440 if hdr == 'EndListPeerNotes': 2441 job._appendMsg(msg) 2442 job.callback('successful', job.msgs) 2443 job._putResult(job.msgs) 2444 2445 2446 2447 2448 return 2449 2450 if hdr == 'PeerNote': 2451 if(job.cmd == "ListPeerNotes"): 2452 job.callback('pending', msg) 2453 job._appendMsg(msg) 2454 else: 2455 job.callback('successful', msg) 2456 job._putResult(msg) 2457 return 2458 2459 if hdr == 'UnknownPeerNoteType': 2460 job._appendMsg(msg) 2461 job.callback('failed', job.msgs) 2462 job._putResult(job.msgs) 2463 return 2464 2465 # ----------------------------- 2466 # handle persistent job messages 2467 2468 if hdr == 'PersistentGet': 2469 job.callback('pending', msg) 2470 job._appendMsg(msg) 2471 return 2472 2473 if hdr == 'PersistentPut': 2474 job.callback('pending', msg) 2475 job._appendMsg(msg) 2476 return 2477 2478 if hdr == 'PersistentPutDir': 2479 job.callback('pending', msg) 2480 job._appendMsg(msg) 2481 return 2482 2483 if hdr == 'EndListPersistentRequests': 2484 job._appendMsg(msg) 2485 job.callback('successful', job.msgs) 2486 job._putResult(job.msgs) 2487 return 2488 2489 if hdr == 'PersistentRequestRemoved': 2490 if self.jobs.has_key(id): 2491 del self.jobs[id] 2492 return 2493 2494 # ----------------------------- 2495 # handle USK Subscription , thanks to Enzo Matrix 2496 2497 # Note from Enzo Matrix: I just needed the messages to get 2498 # passed through to the job, and have its callback function 2499 # called so I can do something when a USK gets updated. I 2500 # handle the checking whether the message was a 2501 # SubscribedUSKUpdate in the callback, which is defined in the 2502 # spider. 2503 if hdr == 'SubscribedUSK': 2504 job.callback('successful', msg) 2505 return 2506 2507 if hdr == 'SubscribedUSKUpdate': 2508 job.callback('successful', msg) 2509 return 2510 2511 if hdr == 'SubscribedUSKRoundFinished': 2512 job.callback('successful', msg) 2513 return 2514 2515 if hdr == 'SubscribedUSKSendingToNetwork': 2516 job.callback('successful', msg) 2517 return 2518 2519 # ----------------------------- 2520 # handle testDDA messages 2521 2522 if hdr == 'TestDDAReply': 2523 # return all the data recieved 2524 job.callback('successful', msg) 2525 job._putResult(msg) 2526 2527 # remove job from queue 2528 self.jobs.pop(id, None) 2529 return 2530 2531 if hdr == 'TestDDAComplete': 2532 # return all the data recieved 2533 job.callback('successful', msg) 2534 job._putResult(msg) 2535 2536 # remove job from queue 2537 self.jobs.pop(id, None) 2538 return 2539 2540 # ----------------------------- 2541 # handle NodeData 2542 if hdr == 'NodeData': 2543 # return all the data recieved 2544 job.callback('successful', msg) 2545 job._putResult(msg) 2546 2547 # remove job from queue 2548 self.jobs.pop(id, None) 2549 return 2550 2551 # ----------------------------- 2552 # handle various errors 2553 2554 if hdr == 'ProtocolError': 2555 job.callback('failed', msg) 2556 job._putResult(FCPProtocolError(msg)) 2557 return 2558 2559 if hdr == 'IdentifierCollision': 2560 log(ERROR, "IdentifierCollision on id %s ???" % id) 2561 job.callback('failed', msg) 2562 job._putResult(Exception("Duplicate job identifier %s" % id)) 2563 return 2564 2565 # Ignore informational headers (since 1254) 2566 if hdr == 'ExpectedHashes' or hdr == 'CompatibilityMode': 2567 return 2568 2569 # ----------------------------- 2570 # wtf is happening here?!? 2571 2572 log(ERROR, "Unknown message type from node: %s" % hdr) 2573 job.callback('failed', msg) 2574 job._putResult(FCPException(msg)) 2575 return
2576 #@-node:_on_rxMsg 2577 #@-others 2578 2579 #@-node:Manager Thread 2580 #@+node:Low Level Methods 2581 # low level noce comms methods 2582 2583 #@+others 2584 #@+node:_hello
2585 - def _hello(self):
2586 """ 2587 perform the initial FCP protocol handshake 2588 """ 2589 self._txMsg("ClientHello", 2590 Name=self.name, 2591 ExpectedVersion=expectedVersion) 2592 2593 resp = self._rxMsg() 2594 if(resp.has_key("Version")): 2595 self.nodeVersion = resp[ "Version" ]; 2596 if(resp.has_key("FCPVersion")): 2597 self.nodeFCPVersion = resp[ "FCPVersion" ]; 2598 if(resp.has_key("Build")): 2599 try: 2600 self.nodeBuild = int( resp[ "Build" ] ); 2601 except Exception, msg: 2602 pass; 2603 else: 2604 nodeVersionFields = self.nodeVersion.split( "," ); 2605 if( len( nodeVersionFields ) == 4 ): 2606 try: 2607 self.nodeBuild = int( nodeVersionFields[ 3 ] ); 2608 except Exception, msg: 2609 pass; 2610 if(resp.has_key("Revision")): 2611 try: 2612 self.nodeRevision = int( resp[ "Revision" ] ); 2613 except Exception, msg: 2614 pass; 2615 if(resp.has_key("ExtBuild")): 2616 try: 2617 self.nodeExtBuild = int( resp[ "ExtBuild" ] ); 2618 except Exception, msg: 2619 pass; 2620 if(resp.has_key("Revision")): 2621 try: 2622 self.nodeExtRevision = int( resp[ "ExtRevision" ] ); 2623 except Exception, msg: 2624 pass; 2625 if(resp.has_key("Testnet")): 2626 if( "true" == resp[ "Testnet" ] ): 2627 self.nodeIsTestnet = True; 2628 else: 2629 self.nodeIsTestnet = False; 2630 if(resp.has_key("ConnectionIdentifier")): 2631 self.connectionidentifier = resp[ "ConnectionIdentifier" ] 2632 try: 2633 self.compressionCodecs = self._parseCompressionCodecs( 2634 resp [ "CompressionCodecs" ]) 2635 except (KeyError, IndexError, ValueError): 2636 pass 2637 2638 2639 return resp
2640 2641 #@-node:_hello 2642 #@+node:_parseCompressionCodecs
2643 - def _parseCompressionCodecs(self, CompressionCodecsString):
2644 """ 2645 Turn the CompressionCodecsString returned by the node into a list 2646 of name and number of the codec. 2647 2648 @param CompressionCodecsString: "3 - GZIP(0), BZIP2(1), LZMA(2)" 2649 @return: [(name, number), ...] 2650 2651 """ 2652 return [(name, int(number[:-1])) 2653 for name, number 2654 in [i.split("(") 2655 for i in CompressionCodecsString.split( 2656 " - ")[1].split(", ")]]
2657 #@-node:_parseCompressionCodecs 2658 #@+node:defaultCompressionCodecsString
2660 """ 2661 Turn the CompressionCodecs into a string accepted by the node. 2662 2663 @param CompressionCodecs: [(name, number), ...] 2664 @return: "GZIP, BZIP2, LZMA" (example) 2665 2666 """ 2667 return ", ".join([name for name, num in self.compressionCodecs])
2668 #@-node:defaultCompressionCodecsString 2669 #@+node:_getUniqueId
2670 - def _getUniqueId(self):
2671 """ 2672 Allocate a unique ID for a request 2673 """ 2674 timenum = int( time.time() * 1000000 ); 2675 randnum = random.randint( 0, timenum ); 2676 return "id" + str( timenum + randnum );
2677 2678 #@-node:_getUniqueId 2679 #@+node:_txMsg
2680 - def _txMsg(self, msgType, **kw):
2681 """ 2682 low level message send 2683 2684 Arguments: 2685 - msgType - one of the FCP message headers, such as 'ClientHello' 2686 - args - zero or more (keyword, value) tuples 2687 Keywords: 2688 - rawcmd - if given, this is the raw buffer to send 2689 - other keywords depend on the value of msgType 2690 """ 2691 log = self._log 2692 2693 # just send the raw command, if given 2694 rawcmd = kw.get('rawcmd', None) 2695 if rawcmd: 2696 self.socket.sendall(rawcmd) 2697 log(DETAIL, "CLIENT: %s" % rawcmd) 2698 return 2699 2700 if kw.has_key("Data"): 2701 data = kw.pop("Data") 2702 sendEndMessage = False 2703 else: 2704 data = None 2705 sendEndMessage = True 2706 2707 items = [msgType + "\n"] 2708 log(DETAIL, "CLIENT: %s" % msgType) 2709 2710 #print "CLIENT: %s" % msgType 2711 for k, v in kw.items(): 2712 #print "CLIENT: %s=%s" % (k,v) 2713 line = k + "=" + str(v) 2714 items.append(line + "\n") 2715 log(DETAIL, "CLIENT: %s" % line) 2716 2717 if data != None: 2718 items.append("DataLength=%d\n" % len(data)) 2719 log(DETAIL, "CLIENT: DataLength=%d" % len(data)) 2720 items.append("Data\n") 2721 log(DETAIL, "CLIENT: ...data...") 2722 items.append(data) 2723 2724 #print "sendEndMessage=%s" % sendEndMessage 2725 2726 if sendEndMessage: 2727 items.append("EndMessage\n") 2728 log(DETAIL, "CLIENT: EndMessage") 2729 raw = "".join(items) 2730 2731 self.socket.sendall(raw)
2732 2733 #@-node:_txMsg 2734 #@+node:_rxMsg
2735 - def _rxMsg(self):
2736 """ 2737 Receives and returns a message as a dict 2738 2739 The header keyword is included as key 'header' 2740 """ 2741 log = self._log 2742 2743 log(DETAIL, "NODE: ----------------------------") 2744 2745 # shorthand, for reading n bytes 2746 def read(n): 2747 if n > 1: 2748 log(DEBUG, "read: want %d bytes" % n) 2749 chunks = [] 2750 remaining = n 2751 while remaining > 0: 2752 chunk = self.socket.recv(remaining) 2753 chunklen = len(chunk) 2754 if chunk: 2755 chunks.append(chunk) 2756 else: 2757 self.nodeIsAlive = False 2758 raise FCPNodeFailure("FCP socket closed by node") 2759 remaining -= chunklen 2760 if remaining > 0: 2761 if n > 1: 2762 log(DEBUG, 2763 "wanted %s, got %s still need %s bytes" % (n, chunklen, remaining) 2764 ) 2765 pass 2766 buf = "".join(chunks) 2767 return buf
2768 2769 # read a line 2770 def readln(): 2771 buf = [] 2772 while True: 2773 c = read(1) 2774 buf.append(c) 2775 if c == '\n': 2776 break 2777 ln = "".join(buf) 2778 log(DETAIL, "NODE: " + ln[:-1]) 2779 return ln
2780 2781 items = {} 2782 2783 # read the header line 2784 while True: 2785 line = readln().strip() 2786 if line: 2787 items['header'] = line 2788 break 2789 2790 # read the body 2791 while True: 2792 line = readln().strip() 2793 if line in ['End', 'EndMessage']: 2794 break 2795 2796 if line == 'Data': 2797 # read the following data 2798 2799 # try to locate job 2800 id = items['Identifier'] 2801 job = self.jobs[id] 2802 if job.stream: 2803 # loop to transfer from socket to stream 2804 remaining = items['DataLength'] 2805 stream = job.stream 2806 while remaining > 0: 2807 buf = self.socket.recv(remaining) 2808 stream.write(buf) 2809 stream.flush() 2810 remaining -= len(buf) 2811 items['Data'] = None 2812 else: 2813 buf = read(items['DataLength']) 2814 items['Data'] = buf 2815 log(DETAIL, "NODE: ...<%d bytes of data>" % len(buf)) 2816 break 2817 else: 2818 # it's a normal 'key=val' pair 2819 try: 2820 k, v = line.split("=", 1) 2821 except: 2822 log(ERROR, "_rxMsg: barfed splitting '%s'" % repr(line)) 2823 raise 2824 2825 # attempt int conversion 2826 try: 2827 v = int(v) 2828 except: 2829 pass 2830 2831 items[k] = v 2832 2833 # all done 2834 return items 2835 2836 #@-node:_rxMsg 2837 #@+node:_log
2838 - def _log(self, level, msg):
2839 """ 2840 Logs a message. If level > verbosity, don't output it 2841 """ 2842 if level > self.verbosity: 2843 return 2844 2845 if(None != self.logfile): 2846 if not msg.endswith("\n"): 2847 msg += "\n" 2848 self.logfile.write(msg) 2849 self.logfile.flush() 2850 if(None != self.logfunc): 2851 while( msg.endswith("\n") ): 2852 msg = msg[ : -1 ] 2853 msglines = msg.split("\n") 2854 for msgline in msglines: 2855 self.logfunc(msgline)
2856 2857 #@-node:_log 2858 #@-others 2859 #@-node:Low Level Methods 2860 #@-others 2861 2862 2863 #@-node:class FCPNode 2864 #@+node:class JobTicket
2865 -class JobTicket:
2866 """ 2867 A JobTicket is an object returned to clients making 2868 asynchronous requests. It puts them in control of how 2869 they manage n concurrent requests. 2870 2871 When you as a client receive a JobTicket, you can choose to: 2872 - block, awaiting completion of the job 2873 - poll the job for completion status 2874 - receive a callback upon completion 2875 2876 Attributes of interest: 2877 - isPersistent - True if job is persistent 2878 - isGlobal - True if job is global 2879 - followRedirect - follow a redirect if true, otherwise fail the get 2880 - value - value returned upon completion, or None if not complete 2881 - node - the node this job belongs to 2882 - id - the job Identifier 2883 - cmd - the FCP message header word 2884 - kw - the keywords in the FCP header 2885 - msgs - any messages received from node in connection 2886 to this job 2887 """ 2888 #@ @+others 2889 #@+node:__init__
2890 - def __init__(self, node, id, cmd, kw, **opts):
2891 """ 2892 You should never instantiate a JobTicket object yourself 2893 """ 2894 self.node = node 2895 self.id = id 2896 self.cmd = cmd 2897 2898 self.verbosity = opts.get('verbosity', ERROR) 2899 self._log = opts.get('logger', self.defaultLogger) 2900 self.keep = opts.get('keep', False) 2901 self.stream = opts.get('stream', None) 2902 self.followRedirect = opts.get('followRedirect', False) 2903 2904 # find out if persistent 2905 if kw.get("Persistent", "connection") != "connection" \ 2906 or kw.get("PersistenceType", "connection") != "connection": 2907 self.isPersistent = True 2908 else: 2909 self.isPersistent = False 2910 2911 if kw.get('Global', 'false') == 'true': 2912 self.isGlobal = True 2913 else: 2914 self.isGlobal = False 2915 2916 self.kw = kw 2917 2918 self.msgs = [] 2919 2920 callback = kw.pop('callback', None) 2921 if callback: 2922 self.callback = callback 2923 2924 self.timeout = int(kw.pop('timeout', 86400*365)) 2925 self.timeQueued = int(time.time()) 2926 self.timeSent = None 2927 2928 self.lock = threading.Lock() 2929 #print "** JobTicket.__init__: lock=%s" % self.lock 2930 2931 self.lock.acquire() 2932 self.result = None 2933 2934 self.reqSentLock = threading.Lock() 2935 self.reqSentLock.acquire()
2936 2937 #@-node:__init__ 2938 #@+node:isComplete
2939 - def isComplete(self):
2940 """ 2941 Returns True if the job has been completed 2942 """ 2943 return self.result != None
2944 2945 #@-node:isComplete 2946 #@+node:wait
2947 - def wait(self, timeout=None):
2948 """ 2949 Waits forever (or for a given timeout) for a job to complete 2950 """ 2951 log = self._log 2952 2953 log(DEBUG, "wait:%s:%s: timeout=%ss" % (self.cmd, self.id, timeout)) 2954 2955 # wait forever for job to complete, if no timeout given 2956 if timeout == None: 2957 log(DEBUG, "wait:%s:%s: no timeout" % (self.cmd, self.id)) 2958 while not self.lock.acquire(False): 2959 time.sleep(_pollInterval) 2960 self.lock.release() 2961 return self.getResult() 2962 2963 # wait for timeout 2964 then = int(time.time()) 2965 2966 # ensure command has been sent, wait if not 2967 while not self.reqSentLock.acquire(False): 2968 2969 # how long have we waited? 2970 elapsed = int(time.time()) - then 2971 2972 # got any time left? 2973 if elapsed < timeout: 2974 # yep, patience remains 2975 time.sleep(_pollInterval) 2976 log(DEBUG, "wait:%s:%s: job not dispatched, timeout in %ss" % \ 2977 (self.cmd, self.id, timeout-elapsed)) 2978 continue 2979 2980 # no - timed out waiting for job to be sent to node 2981 log(DEBUG, "wait:%s:%s: timeout on send command" % (self.cmd, self.id)) 2982 raise FCPSendTimeout( 2983 header="Command '%s' took too long to be sent to node" % self.cmd 2984 ) 2985 2986 log(DEBUG, "wait:%s:%s: job now dispatched" % (self.cmd, self.id)) 2987 2988 # wait now for node response 2989 while not self.lock.acquire(False): 2990 # how long have we waited? 2991 elapsed = int(time.time()) - then 2992 2993 # got any time left? 2994 if elapsed < timeout: 2995 # yep, patience remains 2996 time.sleep(_pollInterval) 2997 2998 #print "** lock=%s" % self.lock 2999 3000 if timeout < ONE_YEAR: 3001 log(DEBUG, "wait:%s:%s: awaiting node response, timeout in %ss" % \ 3002 (self.cmd, self.id, timeout-elapsed)) 3003 continue 3004 3005 # no - timed out waiting for node to respond 3006 log(DEBUG, "wait:%s:%s: timeout on node response" % (self.cmd, self.id)) 3007 raise FCPNodeTimeout( 3008 header="Command '%s' took too long for node response" % self.cmd 3009 ) 3010 3011 log(DEBUG, "wait:%s:%s: job complete" % (self.cmd, self.id)) 3012 3013 # if we get here, we got the lock, command completed 3014 self.lock.release() 3015 3016 # and we have a result 3017 return self.getResult()
3018 3019 #@-node:wait 3020 #@+node:waitTillReqSent
3021 - def waitTillReqSent(self):
3022 """ 3023 Waits till the request has been sent to node 3024 """ 3025 self.reqSentLock.acquire()
3026 3027 #@-node:waitTillReqSent 3028 #@+node:getResult
3029 - def getResult(self):
3030 """ 3031 Returns result of job, or None if job still not complete 3032 3033 If result is an exception object, then raises it 3034 """ 3035 if isinstance(self.result, Exception): 3036 raise self.result 3037 else: 3038 return self.result
3039 3040 #@-node:getResult 3041 #@+node:callback
3042 - def callback(self, status, value):
3043 """ 3044 This will be replaced in job ticket instances wherever 3045 user provides callback arguments 3046 """
3047 # no action needed 3048 3049 #@-node:callback 3050 #@+node:cancel
3051 - def cancel(self):
3052 """ 3053 Cancels the job, if it is persistent 3054 3055 Does nothing if the job was not persistent 3056 """ 3057 if not self.isPersistent: 3058 return 3059 3060 # remove from node's jobs lists 3061 try: 3062 del self.node.jobs[self.id] 3063 except: 3064 pass 3065 3066 # send the cancel 3067 if self.isGlobal: 3068 isGlobal = "true" 3069 else: 3070 isGlobal = "False" 3071 3072 self.node._txMsg("RemovePersistentRequest", 3073 Global=isGlobal, 3074 Identifier=self.id)
3075 3076 #@-node:cancel 3077 #@+node:_appendMsg
3078 - def _appendMsg(self, msg):
3079 self.msgs.append(msg)
3080 3081 #@-node:_appendMsg 3082 #@+node:_putResult
3083 - def _putResult(self, result):
3084 """ 3085 Called by manager thread to indicate job is complete, 3086 and submit a result to be picked up by client 3087 """ 3088 self.result = result 3089 3090 if not (self.keep or self.isPersistent or self.isGlobal): 3091 try: 3092 del self.node.jobs[self.id] 3093 except: 3094 pass 3095 3096 #print "** job: lock=%s" % self.lock 3097 3098 try: 3099 self.lock.release() 3100 except: 3101 pass
3102 3103 #print "** job: lock released" 3104 3105 #@-node:_putResult 3106 #@+node:__repr__
3107 - def __repr__(self):
3108 if self.kw.has_key("URI"): 3109 uri = " URI=%s" % self.kw['URI'] 3110 else: 3111 uri = "" 3112 return "<FCP job %s:%s%s" % (self.id, self.cmd, uri)
3113 3114 #@-node:__repr__ 3115 #@+node:defaultLogger
3116 - def defaultLogger(self, level, msg):
3117 3118 if level > self.verbosity: 3119 return 3120 3121 if not msg.endswith("\n"): msg += "\n" 3122 3123 sys.stdout.write(msg) 3124 sys.stdout.flush()
3125 3126 #@-node:defaultLogger 3127 #@-others 3128 3129 #@-node:class JobTicket 3130 #@+node:util funcs 3131 #@+others 3132 #@+node:toBool
3133 -def toBool(arg):
3134 try: 3135 arg = int(arg) 3136 if arg: 3137 return "true" 3138 except: 3139 pass 3140 3141 if isinstance(arg, str): 3142 if arg.strip().lower()[0] == 't': 3143 return "true" 3144 else: 3145 return "false" 3146 3147 if arg: 3148 return True 3149 else: 3150 return False
3151 3152 #@-node:toBool 3153 #@+node:readdir
3154 -def readdir(dirpath, prefix='', gethashes=False):
3155 """ 3156 Reads a directory, returning a sequence of file dicts. 3157 3158 TODO: Currently this uses sha1 as hash. Freenet uses 256. But the 3159 hashes are not used. 3160 3161 Arguments: 3162 - dirpath - relative or absolute pathname of directory to scan 3163 - gethashes - also include a 'hash' key in each file dict, being 3164 the SHA1 hash of the file's name and contents 3165 3166 Each returned dict in the sequence has the keys: 3167 - fullpath - usable for opening/reading file 3168 - relpath - relative path of file (the part after 'dirpath'), 3169 for the 'SSK@blahblah//relpath' URI 3170 - mimetype - guestimated mimetype for file 3171 3172 >>> tempdir = tempfile.mkdtemp() 3173 >>> filename = "test.txt" 3174 >>> testfile = os.path.join(tempdir, filename) 3175 >>> with open(testfile, "w") as f: 3176 ... f.write("test") 3177 >>> correct = [{'mimetype': 'text/plain', 'fullpath': testfile, 'relpath': filename}] 3178 >>> correct == readdir(tempdir) 3179 True 3180 >>> tempdir = tempfile.mkdtemp() 3181 >>> filename = "test" 3182 >>> testfile = os.path.join(tempdir, filename) 3183 >>> with open(testfile, "w") as f: 3184 ... f.write("test") 3185 >>> correct = [{'mimetype': 'application/octet-stream', 'fullpath': testfile, 'relpath': filename}] 3186 >>> correct == readdir(tempdir) 3187 True 3188 >>> res = readdir(tempdir, gethashes=True) 3189 >>> res[0]["hash"] == hashFile(testfile) 3190 True 3191 """ 3192 3193 #set_trace() 3194 #print "dirpath=%s, prefix='%s'" % (dirpath, prefix) 3195 entries = [] 3196 for f in os.listdir(dirpath): 3197 relpath = prefix + f 3198 fullpath = os.path.join(dirpath, f) 3199 if f == '.freesiterc' or f.endswith("~"): 3200 continue 3201 if os.path.isdir(fullpath) \ 3202 or os.path.islink(fullpath) and os.path.isdir(os.path.realpath(fullpath)): 3203 entries.extend( 3204 readdir( 3205 os.path.join(dirpath, f), 3206 relpath + os.path.sep, 3207 gethashes 3208 ) 3209 ) 3210 else: 3211 #entries[relpath] = {'mimetype':'blah/shit', 'fullpath':dirpath+"/"+relpath} 3212 fullpath = os.path.join(dirpath, f) 3213 entry = {'relpath' :relpath, 3214 'fullpath':fullpath, 3215 'mimetype':guessMimetype(f) 3216 } 3217 if gethashes: 3218 entry['hash'] = hashFile(fullpath) 3219 entries.append(entry) 3220 entries.sort(lambda f1,f2: cmp(f1['relpath'], f2['relpath'])) 3221 3222 return entries
3223 3224 #@-node:readdir 3225 #@+node:hashFile
3226 -def hashFile(path):
3227 """ 3228 returns an SHA(1) hash of a file's contents 3229 3230 >>> oslevelid, filepath = tempfile.mkstemp(text=True) 3231 >>> with open(filepath, "w") as f: 3232 ... f.write("test") 3233 >>> hashFile(filepath) == hashlib.sha1("test").hexdigest() 3234 True 3235 """ 3236 raw = file(path, "rb").read() 3237 return hashlib.sha1(raw).hexdigest()
3238
3239 -def sha256dda(nodehelloid, identifier, path=None):
3240 """ 3241 returns a sha256 hash of a file's contents for bypassing TestDDA 3242 3243 >>> oslevelid, filepath = tempfile.mkstemp(text=True) 3244 >>> with open(filepath, "wb") as f: 3245 ... f.write("test") 3246 >>> print sha256dda("1","2",filepath) == hashlib.sha256("1-2-" + "test").digest() 3247 True 3248 """ 3249 tohash = "-".join([nodehelloid, identifier, file(path, "rb").read()]) 3250 return hashlib.sha256(tohash).digest()
3251 3252 #@-node:hashFile 3253 #@+node:guessMimetype
3254 -def guessMimetype(filename):
3255 """ 3256 Returns a guess of a mimetype based on a filename's extension 3257 """ 3258 if filename.endswith(".tar.bz2"): 3259 return ('application/x-tar', 'bzip2') 3260 3261 try: 3262 m = mimetypes.guess_type(filename, False)[0] 3263 except: 3264 m = None 3265 if m is None: # either an exception or a genuine None 3266 # FIXME: log(INFO, "Could not find mimetype for filename %s" % filename) 3267 m = "application/octet-stream" 3268 return m
3269 3270 3271 _re_slugify = re.compile('[^\w\s\.-]', re.UNICODE) 3272 _re_slugify_multidashes = re.compile('[-\s]+', re.UNICODE)
3273 -def toUrlsafe(filename):
3274 """Make a filename url-safe, keeping only the basename and killing all 3275 potentially unfitting characters. 3276 3277 :returns: urlsafe basename of the file as string.""" 3278 filename = unicode(os.path.basename(filename), encoding="utf-8", errors="ignore") 3279 filename = unicodedata.normalize('NFKD', filename).encode("ascii", "ignore") 3280 filename = unicode(_re_slugify.sub('', filename).strip().lower()) 3281 filename = _re_slugify_multidashes.sub('-', filename) 3282 return str(filename)
3283 3284 3285 #@-node:guessMimetype 3286 #@+node:uriIsPrivate
3287 -def uriIsPrivate(uri):
3288 """ 3289 analyses an SSK URI, and determines if it is an SSK or USK private key 3290 3291 for details see https://wiki.freenetproject.org/Signed_Subspace_Key 3292 3293 >>> uriIsPrivate("SSK@~Udj39wzRUN4J-Kqn1aWN8kJyHL6d44VSyWoqSjL60A,iAtIH8348UGKfs8lW3mw0lm0D9WLwtsIzZhvMWelpK0,AQACAAE/") 3294 False 3295 >>> uriIsPrivate("SSK@R-skbNbiXqWkqj8FPDTusWyk7u8HLvbdysyRY3eY9A0,iAtIH8348UGKfs8lW3mw0lm0D9WLwtsIzZhvMWelpK0,AQECAAE/") 3296 True 3297 >>> uriIsPrivate("USK@AIcCHvrGspY-7J73J3VR-Td3DuPvw3IqCyjjRK6EvJol,hEvqa41cm72Wc9O1AjZ0OoDU9JVGAvHDDswIE68pT7M,AQECAAE/test.R1/0") 3298 True 3299 >>> uriIsPrivate("KSK@AIcCHvrGspY-7J73J3VR-Td3DuPvw3IqCyjjRK6EvJol,hEvqa41cm72Wc9O1AjZ0OoDU9JVGAvHDDswIE68pT7M,AQECAAE/test.R1/0") 3300 False 3301 >>> uriIsPrivate("SSK@JhtPxdPLx30sRN0c5S2Hhcsif~Yqy1lsGiAx5Wkq7Lo,-e0kLAjmmclSR7uL0TN901tS3iSx2-21Id8tUp4tyzg,AQECAAE/") 3302 True 3303 """ 3304 # strip leading stuff 3305 if uri.startswith("freenet:"): 3306 uri = uri[8:] 3307 if uri.startswith("//"): 3308 uri = uri[2:] 3309 # actual recognition: SSK or USK 3310 if not (uri.startswith("SSK@") or uri.startswith("USK@")): 3311 return False 3312 try: 3313 symmetric, publicprivate, extra = uri.split(",")[:3] 3314 except (IndexError, ValueError): 3315 return False 3316 if "/" in extra: 3317 extra = extra.split("/")[0] 3318 extra += "/" 3319 extrabytes = base64.decodestring(extra) 3320 isprivate = ord(extrabytes[1]) 3321 if isprivate: 3322 return True 3323 return False
3324 3325 #@-node:uriIsPrivate 3326 #@+node:parseTime
3327 -def parseTime(t):
3328 """ 3329 Parses a time value, recognising suffices like 'm' for minutes, 3330 's' for seconds, 'h' for hours, 'd' for days, 'w' for weeks, 3331 'M' for months. 3332 3333 >>> endings = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24, 'w':60*60*24*7, 'M':60*60*24*30} 3334 >>> not False in [endings[i]*3 == parseTime("3"+i) for i in endings] 3335 True 3336 3337 Returns time value in seconds 3338 """ 3339 if not t: 3340 raise Exception("Invalid time '%s'" % t) 3341 3342 if not isinstance(t, str): 3343 t = str(t) 3344 3345 t = t.strip() 3346 if not t: 3347 raise Exception("Invalid time value '%s'"% t) 3348 3349 endings = {'s':1, 'm':60, 'h':3600, 'd':86400, 'w':86400*7, 'M':86400*30} 3350 3351 lastchar = t[-1] 3352 3353 if lastchar in endings.keys(): 3354 t = t[:-1] 3355 multiplier = endings[lastchar] 3356 else: 3357 multiplier = 1 3358 3359 return int(t) * multiplier
3360 3361 #@-node:parseTime 3362 #@+node:base64 stuff 3363 # functions to encode/decode base64, freenet alphabet 3364 #@+others 3365 #@+node:base64encode
3366 -def base64encode(raw):
3367 """ 3368 Encodes a string to base64, using the Freenet alphabet 3369 """ 3370 # encode using standard RFC1521 base64 3371 enc = base64.encodestring(raw) 3372 3373 # convert the characters to freenet encoding scheme 3374 enc = enc.replace("+", "~") 3375 enc = enc.replace("/", "-") 3376 enc = enc.replace("=", "_") 3377 enc = enc.replace("\n", "") 3378 3379 return enc
3380 3381 #@-node:base64encode 3382 #@+node:base64decode
3383 -def base64decode(enc):
3384 """ 3385 Decodes a freenet-encoded base64 string back to a binary string 3386 3387 Arguments: 3388 - enc - base64 string to decode 3389 """ 3390 # TODO: Are underscores actually used anywhere? 3391 enc = enc.replace("_", "=") 3392 3393 # Add padding. Freenet may omit it. 3394 while (len(enc) % 4) != 0: 3395 enc += '=' 3396 3397 # Now ready to decode. ~ instead of +; - instead of /. 3398 raw = base64.b64decode(enc, '~-') 3399 3400 return raw
3401 3402 #@-node:base64decode 3403 #@-others 3404 3405 #@-node:base64 stuff 3406 #@-others 3407 3408 #@-node:util funcs 3409 #@-others 3410 3411 3412 #@-node:@file node.py 3413 #@-leo 3414
3415 -def _base30hex(integer):
3416 """Turn an integer into a simple lowercase base30hex encoding.""" 3417 base30 = "0123456789abcdefghijklmnopqrst" 3418 b30 = [] 3419 while integer: 3420 b30.append(base30[integer%30]) 3421 integer = int(integer / 30) 3422 return "".join(reversed(b30))
3423 3424
3425 -def _test():
3426 import doctest 3427 tests = doctest.testmod() 3428 if tests.failed: 3429 return "☹"*tests.failed 3430 return "^_^ (" + _base30hex(tests.attempted) + ")"
3431 3432 3433 if __name__ == "__main__": 3434 print _test() 3435