"""
Restful json `CherryPy <http://cherrypy.org/>`_ server.
The server script mounts a `WebSearcher`_ (read_only) or `WebIndexer`_ root.
Standard `CherryPy configuration <http://docs.cherrypy.org/en/latest/config.html>`_ applies,
and the provided `custom tools <#tools>`_ are also configurable.
All request and response bodies are `application/json values <http://tools.ietf.org/html/rfc4627.html#section-2.1>`_.
WebSearcher exposes resources for an IndexSearcher.
In addition to search requests, it provides access to term and document information in the index.
* :meth:`/ <WebSearcher.index>`
* :meth:`/search <WebSearcher.search>`
* :meth:`/docs <WebSearcher.docs>`
* :meth:`/terms <WebSearcher.terms>`
* :meth:`/update <WebSearcher.update>`
* :meth:`/queries <WebSearcher.queries>`
WebIndexer extends WebSearcher, exposing additional resources and methods for an Indexer.
Single documents may be added, deleted, or replaced by a unique indexed field.
Multiples documents may also be added or deleted by query at once.
By default changes are not visible until the update resource is called to commit a new index version.
If a near real-time Indexer is used, then changes are instantly searchable.
In such cases a commit still hasn't occurred, and the index based :meth:`last-modified header <validate>` shouldn't be used for caching.
* :meth:`/ <WebIndexer.index>`
* :meth:`/search <WebIndexer.search>`
* :meth:`/docs <WebIndexer.docs>`
* :meth:`/fields <WebIndexer.fields>`
* :meth:`/update <WebIndexer.update>`
Custom servers should create and mount WebSearchers and WebIndexers as needed.
:meth:`Caches <WebSearcher.update>` and :meth:`field settings <WebIndexer.fields>` can then be applied directly before `starting <#start>`_ the server.
WebSearchers and WebIndexers can of course also be subclassed for custom interfaces.
CherryPy and Lucene VM integration issues:
* Monitors (such as autoreload) are not compatible with the VM unless threads are attached.
* WorkerThreads must be also attached to the VM.
* VM initialization must occur after daemonizing.
* Recommended that the VM ignores keyboard interrupts (-Xrs) for clean server shutdown.
"""
from future_builtins import map, zip
import re
import time
import httplib
import heapq
import collections
import itertools
import os
import argparse
import contextlib
import lucene
import cherrypy
from . import engine, client
from .utils import json, suppress
def tool(hook):
"Return decorator to register tool at given hook point."
def decorator(func):
setattr(cherrypy.tools, func.__name__, cherrypy.Tool(hook, func))
return func
return decorator
@tool('before_request_body')
[docs]def json_in(process_body=None, **kwargs):
"""Handle request bodies in json format.
:param content_type: request media type
:param process_body: optional function to process body into request.params
"""
request = cherrypy.serving.request
def processor(entity):
cherrypy.lib.jsontools.json_processor(entity)
if process_body is not None:
with HTTPError(httplib.BAD_REQUEST, TypeError):
request.params.update(process_body(request.json))
cherrypy.lib.jsontools.json_in(force='content-type' in request.headers, processor=processor, **kwargs)
@tool('before_handler')
[docs]def json_out(content_type='application/json', indent=None, **kwargs):
"""Handle responses in json format.
:param content_type: response content-type header
:param indent: indentation level for pretty printing
"""
def handler(*args, **kwargs):
body = cherrypy.request._json_inner_handler(*args, **kwargs)
return json.dumps(body, indent=indent) if cherrypy.response.headers['content-type'] == content_type else body
cherrypy.lib.jsontools.json_out(content_type, handler=handler, **kwargs)
@tool('on_start_resource')
[docs]def allow(methods=None, paths=(), **kwargs):
"Only allow specified methods."
handler = cherrypy.request.handler
if paths and hasattr(handler, 'args'):
with HTTPError(httplib.NOT_FOUND, IndexError):
methods = paths[len(handler.args)]
cherrypy.lib.cptools.allow(methods, **kwargs)
@tool('before_finalize')
[docs]def timer():
"Return response time in headers."
response = cherrypy.serving.response
response.headers['x-response-time'] = time.time() - response.time
@tool('on_start_resource')
[docs]def validate(etag=True, last_modified=False, max_age=None, expires=None):
"""Return and validate caching headers.
:param etag: return weak entity tag header based on index version and validate if-match headers
:param last_modified: return last-modified header based on index timestamp and validate if-modified headers
:param max_age: return cache-control max-age and age headers based on last update timestamp
:param expires: return expires header offset from last update timestamp
"""
root = cherrypy.request.app.root
headers = cherrypy.response.headers
if etag:
headers['etag'] = root.etag
cherrypy.lib.cptools.validate_etags()
if last_modified:
headers['last-modified'] = cherrypy.lib.httputil.HTTPDate(root.searcher.timestamp)
cherrypy.lib.cptools.validate_since()
if max_age is not None:
headers['age'] = int(time.time() - root.updated)
headers['cache-control'] = 'max-age={}'.format(max_age)
if expires is not None:
headers['expires'] = cherrypy.lib.httputil.HTTPDate(expires + root.updated)
@tool('before_handler')
def params(**types):
"Convert specified request params."
params = cherrypy.request.params
with HTTPError(httplib.BAD_REQUEST, ValueError):
for key in set(types).intersection(params):
params[key] = types[key](params[key])
def multi(value):
return value and value.split(',')
class parse:
"Parameter parsing."
@staticmethod
def q(searcher, q, **options):
options = {key.partition('.')[-1]: options[key] for key in options if key.startswith('q.')}
field = options.pop('field', [])
fields = [field] if isinstance(field, basestring) else field
fields = [name.partition('^')[::2] for name in fields]
if any(boost for name, boost in fields):
field = {name: float(boost or 1.0) for name, boost in fields}
elif isinstance(field, basestring):
(field, boost), = fields
else:
field = [name for name, boost in fields] or ''
if 'type' in options:
with HTTPError(httplib.BAD_REQUEST, AttributeError):
return getattr(engine.Query, options['type'])(field, q)
for key in set(options) - {'op', 'version'}:
with HTTPError(httplib.BAD_REQUEST, ValueError):
options[key] = json.loads(options[key])
if q is not None:
with HTTPError(httplib.BAD_REQUEST, lucene.JavaError):
return searcher.parse(q, field=field, **options)
@staticmethod
def fields(searcher, fields=None, **options):
if fields is not None:
fields = dict.fromkeys(fields)
multi = set(options.get('fields.multi', ()))
indexed = (field.split(':') for field in options.get('fields.indexed', ()))
indexed = {item[0]: searcher.comparator(*item, multi=item[0] in multi) for item in indexed}
return fields, multi.difference(indexed), indexed
def json_error(version, **body):
"Transform errors into json format."
tool = cherrypy.request.toolmaps['tools'].get('json_out', {})
cherrypy.response.headers['content-type'] = tool.get('content_type', 'application/json')
return json.dumps(body, indent=tool.get('indent'))
def attach_thread(id=None):
"Attach current cherrypy worker thread to lucene VM."
lucene.getVMEnv().attachCurrentThread()
class Autoreloader(cherrypy.process.plugins.Autoreloader):
"Autoreload monitor compatible with lucene VM."
def run(self):
attach_thread()
cherrypy.process.plugins.Autoreloader.run(self)
class AttachedMonitor(cherrypy.process.plugins.Monitor):
"Periodically run a callback function in an attached thread."
def __init__(self, bus, callback, frequency=cherrypy.process.plugins.Monitor.frequency):
def run():
attach_thread()
callback()
cherrypy.process.plugins.Monitor.__init__(self, bus, run, frequency)
def subscribe(self):
cherrypy.process.plugins.Monitor.subscribe(self)
if cherrypy.engine.state == cherrypy.engine.states.STARTED:
self.start()
def unsubscribe(self):
cherrypy.process.plugins.Monitor.unsubscribe(self)
self.thread.cancel()
@contextlib.contextmanager
def HTTPError(status, *exceptions):
"Interpret exceptions as an HTTPError with given status code."
try:
yield
except exceptions as exc:
raise cherrypy.HTTPError(status, str(exc))
[docs]class WebSearcher(object):
"""Dispatch root with a delegated Searcher.
:param hosts: ordered hosts to synchronize with
"""
_cp_config = {
'tools.gzip.on': True, 'tools.gzip.mime_types': ['text/html', 'text/plain', 'application/json'],
'tools.accept.on': True, 'tools.accept.media': 'application/json',
'tools.json_in.on': True, 'tools.json_out.on': True,
'tools.allow.on': True, 'tools.timer.on': True,
'tools.validate.on': True, 'error_page.default': json_error,
}
def __init__(self, *directories, **kwargs):
self.hosts = collections.deque(kwargs.pop('hosts', ()))
if self.hosts:
engine.IndexWriter(*directories).close()
self.searcher = engine.MultiSearcher(directories, **kwargs) if len(directories) > 1 else engine.IndexSearcher(*directories, **kwargs)
self.updated = time.time()
self.query_map = {}
@classmethod
def new(cls, *args, **kwargs):
"Return new uninitialized root which can be mounted on dispatch tree before VM initialization."
self = object.__new__(cls)
self.args, self.kwargs = args, kwargs
return self
def close(self):
self.searcher.close()
@property
def etag(self):
return 'W/"{}"'.format(self.searcher.version)
def sync(self, host, path=''):
"Sync with remote index."
directory = self.searcher.path
resource = client.Resource(host)
resource.headers = dict(resource.headers, **{'if-none-match': self.etag})
response = resource.call('PUT', '/{}/update/snapshot'.format(path.lstrip('/')))
if response.status == httplib.PRECONDITION_FAILED:
return []
names = sorted(set(response()).difference(os.listdir(directory)))
path = response.getheader('location') + '/'
try:
for name in names:
resource.download(path + name, os.path.join(directory, name))
finally:
resource.delete(path)
return names
@cherrypy.expose
@cherrypy.tools.json_in(process_body=dict)
@cherrypy.tools.allow(methods=['GET', 'POST'])
[docs] def index(self, host='', path=''):
"""Return index information and synchronize with remote index.
**GET, POST** /[index]
Return a mapping of the directory to the document count.
Add new segments from remote host.
{"host": *string*\ [, "path": *string*]}
:return: {*string*: *int*,... }
"""
if cherrypy.request.method == 'POST':
self.sync(host, path)
cherrypy.response.status = httplib.ACCEPTED
if isinstance(self.searcher, engine.MultiSearcher):
return {reader.directory().toString(): reader.numDocs() for reader in self.searcher.indexReaders}
return {self.searcher.directory.toString(): len(self.searcher)}
@cherrypy.expose
@cherrypy.tools.json_in(process_body=dict)
@cherrypy.tools.allow(methods=['POST'])
[docs] def update(self, **caches):
"""Refresh index version.
**POST** /update
Reopen searcher, optionally reloading caches, and return document count.
{"filters"|"sorters"|"spellcheckers": true,... }
.. versionchanged:: 1.2 request body is an object instead of an array
:return: *int*
"""
names = ()
while self.hosts:
host = self.hosts[0]
try:
names = self.sync(*host.split('/'))
break
except IOError:
with suppress(ValueError):
self.hosts.remove(host)
except httplib.HTTPException as exc:
assert exc[0] == httplib.METHOD_NOT_ALLOWED, exc
break
self.searcher = self.searcher.reopen(**caches)
self.updated = time.time()
if names:
engine.IndexWriter(self.searcher.directory).close()
if not self.hosts and hasattr(self, 'fields'):
other = WebIndexer(self.searcher.directory, analyzer=self.searcher.analyzer)
other.indexer.shared, other.indexer.fields = self.searcher.shared, self.fields
app, = (app for app in cherrypy.tree.apps.values() if app.root is self)
mount(other, app=app, autoupdate=getattr(self, 'autoupdate', 0))
return len(self.searcher)
@cherrypy.expose
@cherrypy.tools.params(**dict.fromkeys(['fields', 'fields.multi', 'fields.indexed', 'fields.vector', 'fields.vector.counts'], multi))
[docs] def docs(self, name=None, value='', **options):
"""Return ids or documents.
**GET** /docs
Return array of doc ids.
:return: [*int*,... ]
**GET** /docs/[*int*\|\ *chars*/*chars*]?
Return document mapping from id or unique name and value.
&fields=\ *chars*,... &fields.multi=\ *chars*,... &fields.indexed=\ *chars*\ [:*chars*],...
optionally select stored, multi-valued, and cached indexed fields
&fields.vector=\ *chars*,... &fields.vector.counts=\ *chars*,...
optionally select term vectors with term counts
:return: {*string*: null|\ *string*\|\ *number*\|\ *array*\|\ *object*,... }
"""
searcher = self.searcher
if not name:
return list(searcher)
with HTTPError(httplib.NOT_FOUND, ValueError):
id, = searcher.docs(name, value) if value else [int(name)]
fields, multi, indexed = parse.fields(searcher, **options)
with HTTPError(httplib.NOT_FOUND, lucene.JavaError):
doc = searcher[id] if fields is None else searcher.get(id, *itertools.chain(fields, multi))
result = doc.dict(*multi, **(fields or {}))
result.update((name, indexed[name][id]) for name in indexed)
result.update((field, list(searcher.termvector(id, field))) for field in options.get('fields.vector', ()))
result.update((field, dict(searcher.termvector(id, field, counts=True))) for field in options.get('fields.vector.counts', ()))
return result
@cherrypy.expose
@cherrypy.tools.params(count=int, start=int, fields=multi, sort=multi, facets=multi, hl=multi, mlt=int, spellcheck=int, timeout=float, **{
'fields.multi': multi, 'fields.indexed': multi, 'facets.count': int, 'facets.min': int,
'group.count': int, 'hl.count': int, 'mlt.fields': multi,
})
[docs] def search(self, q=None, count=None, start=0, fields=None, sort=None, facets='', group='', hl='', mlt=None, spellcheck=0, timeout=None, **options):
"""Run query and return documents.
**GET** /search?
Return array of document objects and total doc count.
&q=\ *chars*\ &q.type=[term|prefix|wildcard]&q.\ *chars*\ =...,
query, optional type to skip parsing, and optional parser settings: q.field, q.op,...
&filter=\ *chars*
| cached filter applied to the query
| if a previously cached filter is not found, the value will be parsed as a query
&count=\ *int*\ &start=0
maximum number of docs to return and offset to start at
&fields=\ *chars*,... &fields.multi=\ *chars*,... &fields.indexed=\ *chars*\ [:*chars*],...
only include selected stored fields; multi-valued fields returned in an array; indexed fields with optional type are cached
&sort=\ [-]\ *chars*\ [:*chars*],... &sort.scores[=max]
| field name, optional type, minus sign indicates descending
| optionally score docs, additionally compute maximum score
&facets=\ *chars*,... &facets.count=\ *int*\&facets.min=0
| include facet counts for given field names; facets filters are cached
| optional maximum number of most populated facet values per field, and minimum count to return
&group=\ *chars*\ [:*chars*]&group.count=1
| group documents by field value with optional type, up to given maximum count
.. versionchanged:: 1.6 grouping searches use count and start options
&hl=\ *chars*,... &hl.count=1&hl.tag=strong&hl.enable=[fields|terms]
| stored fields to return highlighted
| optional maximum fragment count and html tag name
| optionally enable matching any field or any term
&mlt=\ *int*\ &mlt.fields=\ *chars*,... &mlt.\ *chars*\ =...,
| doc index (or id without a query) to find MoreLikeThis
| optional document fields to match
| optional MoreLikeThis settings: mlt.minTermFreq, mlt.minDocFreq,...
&spellcheck=\ *int*
| maximum number of spelling corrections to return for each query term, grouped by field
| original query is still run; use q.spellcheck=true to affect query parsing
&timeout=\ *number*
timeout search after elapsed number of seconds
:return:
| {
| "query": *string*\|null,
| "count": *int*\|null,
| "maxscore": *number*\|null,
| "docs": [{"__id__": *int*, "__score__": *number*, "__keys__": *array*,
"__highlights__": {*string*: *array*,... }, *string*: *value*,... },... ],
| "facets": {*string*: {*string*: *int*,... },... },
| "groups": [{"count": *int*, "value": *value*, "docs": [*object*,... ]},... ]
| "spellcheck": {*string*: {*string*: [*string*,... ],... },... },
| }
"""
searcher = self.searcher
if sort is not None:
sort = (re.match('(-?)(\w+):?(\w*)', field).groups() for field in sort)
sort = [(name, (type or 'string'), (reverse == '-')) for reverse, name, type in sort]
with HTTPError(httplib.BAD_REQUEST, AttributeError):
sort = [searcher.sorter(name, type, reverse=reverse) for name, type, reverse in sort]
q = parse.q(searcher, q, **options)
qfilter = options.pop('filter', None)
if qfilter is not None and qfilter not in searcher.filters:
searcher.filters[qfilter] = engine.Query.filter(parse.q(searcher, qfilter, **options))
qfilter = searcher.filters.get(qfilter)
if mlt is not None:
if q is not None:
mlt, = searcher.search(q, count=mlt + 1, sort=sort)[mlt:].ids
mltfields = options.pop('mlt.fields', ())
with HTTPError(httplib.BAD_REQUEST, ValueError):
attrs = {key.partition('.')[-1]: json.loads(options[key]) for key in options if key.startswith('mlt.')}
q = searcher.morelikethis(mlt, *mltfields, analyzer=searcher.analyzer, **attrs)
if count is not None:
count += start
if count == 0:
start = count = 1
scores = options.get('sort.scores')
gcount = options.get('group.count', 1)
scores = {'scores': scores is not None, 'maxscore': scores == 'max'}
if ':' in group or group in searcher.sorters:
hits = searcher.search(q, filter=qfilter, sort=sort, timeout=timeout, **scores)
with HTTPError(httplib.BAD_REQUEST, AttributeError):
groups = hits.groupby(searcher.comparator(*group.split(':')).__getitem__, count=count, docs=gcount)
groups.groupdocs = groups.groupdocs[start:]
elif group:
scores = {'includeScores': scores['scores'], 'includeMaxScore': scores['maxscore']}
groups = searcher.groupby(group, q, qfilter, count, start, sort=sort, groupDocsLimit=gcount, **scores)
else:
hits = searcher.search(q, filter=qfilter, sort=sort, count=count, timeout=timeout, **scores)
groups = engine.documents.Groups(searcher, [hits[start:]], hits.count, hits.maxscore)
result = {'query': q and unicode(q), 'count': groups.count, 'maxscore': groups.maxscore}
tag, enable = options.get('hl.tag', 'strong'), options.get('hl.enable', '')
hlcount = options.get('hl.count', 1)
if hl:
hl = {name: searcher.highlighter(q, name, terms='terms' in enable, fields='fields' in enable, tag=tag) for name in hl}
fields, multi, indexed = parse.fields(searcher, fields, **options)
if fields is None:
fields = {}
else:
groups.select(*itertools.chain(fields, multi))
result['groups'] = []
for hits in groups:
docs = []
for hit in hits:
doc = hit.dict(*multi, **fields)
doc.update((name, indexed[name][hit.id]) for name in indexed)
fragments = (hl[name].fragments(hit.id, hlcount) for name in hl) # pragma: no branch
if hl:
doc['__highlights__'] = {name: value for name, value in zip(hl, fragments) if value is not None}
docs.append(doc)
result['groups'].append({'docs': docs, 'count': hits.count, 'value': getattr(hits, 'value', None)})
if not group:
result['docs'] = result.pop('groups')[0]['docs']
q = q or engine.Query.alldocs()
if facets:
facets = (tuple(facet.split(':')) if ':' in facet else facet for facet in facets)
facets = result['facets'] = searcher.facets(q, *facets)
for counts in facets.values():
counts.pop(None, None)
if 'facets.min' in options:
for name, counts in facets.items():
facets[name] = {term: count for term, count in counts.items() if count >= options['facets.min']}
if 'facets.count' in options:
for name, counts in facets.items():
facets[name] = {term: counts[term] for term in heapq.nlargest(options['facets.count'], counts, key=counts.__getitem__)}
if spellcheck:
terms = result['spellcheck'] = collections.defaultdict(dict)
for name, value in engine.Query.terms(q):
terms[name][value] = list(itertools.islice(searcher.correct(name, value), spellcheck))
return result
@cherrypy.expose
@cherrypy.tools.params(count=int, step=int, indexed=json.loads)
[docs] def terms(self, name='', value='*', *path, **options):
"""Return data about indexed terms.
**GET** /terms?
Return field names, with optional selection.
&indexed=true|false
:return: [*string*,... ]
**GET** /terms/*chars*\[:int|float\]?step=0
Return term values for given field name, with optional type and step for numeric encoded values.
:return: [*string*,... ]
**GET** /terms/*chars*/*chars*\[\*\|:*chars*\|~[\ *int*\]]
Return term values (prefix, slices, or fuzzy terms) for given field name.
:return: [*string*,... ]
**GET** /terms/*chars*/*chars*\[\*\|~[\ *int*\]\]?count=\ *int*
Return spellchecked term values ordered by decreasing document frequency.
Prefixes (*) are optimized to be suitable for real-time query suggestions; all terms are cached.
:return: [*string*,... ]
**GET** /terms/*chars*/*chars*
Return document count for given term.
:return: *int*
**GET** /terms/*chars*/*chars*/docs
Return document ids for given term.
:return: [*int*,... ]
**GET** /terms/*chars*/*chars*/docs/counts
Return document ids and frequency counts for given term.
:return: [[*int*, *int*],... ]
**GET** /terms/*chars*/*chars*/docs/positions
Return document ids and positions for given term.
:return: [[*int*, [*int*,... ]],... ]
"""
searcher = self.searcher
if not name:
return sorted(searcher.names(**options))
if ':' in name:
with HTTPError(httplib.BAD_REQUEST, ValueError, AttributeError):
name, type = name.split(':')
type = getattr(__builtins__, type)
return list(searcher.numbers(name, step=options.get('step', 0), type=type))
if ':' in value:
return list(searcher.terms(name, *value.split(':')))
if value.endswith('*'):
value = value.rstrip('*')
if 'count' in options:
return searcher.suggest(name, value, options['count'])
return list(searcher.terms(name, value))
if '~' in value:
with HTTPError(httplib.BAD_REQUEST, ValueError):
value, distance = value.split('~')
distance = int(distance or 2)
if 'count' in options:
return list(itertools.islice(searcher.correct(name, value, distance), options['count']))
return list(searcher.terms(name, value, distance=distance))
if not path:
return searcher.count(name, value)
if path[0] == 'docs':
if path[1:] == ():
return list(searcher.docs(name, value))
if path[1:] == ('counts',):
return list(searcher.docs(name, value, counts=True))
if path[1:] == ('positions',):
return list(searcher.positions(name, value))
raise cherrypy.NotFound()
@cherrypy.expose
@cherrypy.tools.allow(paths=[('GET',), ('GET', 'POST'), ('GET', 'PUT', 'DELETE')])
[docs] def queries(self, name='', value=''):
"""Match a document against registered queries.
Queries are cached by a unique name and value, suitable for document indexing.
.. versionadded:: 1.4
**GET** /queries
Return query set names.
:return: [*string*,... ]
**GET, POST** /queries/*chars*
Return query values and scores which match given document.
{*string*: *string*,... }
:return: {*string*: *number*,... }
**GET, PUT, DELETE** /queries/*chars*/*chars*
Return, create, or delete a registered query.
*string*
:return: *string*
"""
request = cherrypy.serving.request
if not name:
return sorted(self.query_map)
if not value:
if request.method == 'GET':
request.body.process()
with HTTPError(httplib.NOT_FOUND, KeyError):
queries = self.query_map[name]
scores = self.searcher.match(getattr(request, 'json', {}), *queries.values())
return dict(zip(queries, scores))
if request.method == 'DELETE':
return str(self.query_map.get(name, {}).pop(value, '')) or None
if request.method == 'PUT':
queries = self.query_map.setdefault(name, {})
if value not in queries:
cherrypy.response.status = httplib.CREATED
queries[value] = self.searcher.parse(request.json)
with HTTPError(httplib.NOT_FOUND, KeyError):
return str(self.query_map[name][value])
[docs]class WebIndexer(WebSearcher):
"Dispatch root with a delegated Indexer, exposing write methods."
def __init__(self, *args, **kwargs):
self.indexer = engine.Indexer(*args, **kwargs)
self.updated = time.time()
self.query_map = {}
@property
def searcher(self):
return self.indexer.indexSearcher
def close(self):
self.indexer.close()
WebSearcher.close(self)
def refresh(self):
if self.indexer.nrt:
self.indexer.refresh()
self.updated = time.time()
else:
cherrypy.response.status = httplib.ACCEPTED
@cherrypy.expose
@cherrypy.tools.allow(methods=['GET', 'POST'])
[docs] def index(self):
"""Add indexes. See :meth:`WebSearcher.index` for GET method.
**POST** /[index]
Add indexes without optimization.
[*string*,... ]
"""
request = cherrypy.serving.request
if request.method == 'POST':
for directory in getattr(request, 'json', ()):
self.indexer += directory
self.refresh()
return {unicode(self.indexer.directory): len(self.indexer)}
@cherrypy.expose
@cherrypy.tools.json_in(process_body=dict)
@cherrypy.tools.allow(paths=[('POST',), ('GET', 'PUT', 'DELETE'), ('GET',)])
[docs] def update(self, id='', name='', **options):
"""Commit index changes and refresh index version.
**POST** /update
Commit write operations and return document count. See :meth:`WebSearcher.update` for caching options.
{"merge": true|\ *int*,... }
.. versionchanged:: 1.2 request body is an object instead of an array
:return: *int*
**GET, PUT, DELETE** /update/[snapshot|\ *int*]
Verify, create, or release unique snapshot of current index commit and return array of referenced filenames.
.. versionchanged:: 1.4 lucene identifies snapshots by commit generation; use location header
:return: [*string*,... ]
**GET** /update/*int*/*chars*
Download index file corresponding to snapshot id and filename.
"""
if not id:
self.indexer.commit(**options)
self.updated = time.time()
return len(self.indexer)
method = cherrypy.request.method
response = cherrypy.serving.response
if method == 'PUT':
if id != 'snapshot':
raise cherrypy.NotFound()
commit = self.indexer.policy.snapshot()
response.status = httplib.CREATED
response.headers['location'] = cherrypy.url('/update/{0:d}'.format(commit.generation), relative='server')
else:
with HTTPError(httplib.NOT_FOUND, ValueError, AssertionError):
commit = self.indexer.policy.getIndexCommit(long(id))
assert commit is not None, 'commit not snapshotted'
if method == 'DELETE':
self.indexer.policy.release(commit)
if not name:
return list(commit.fileNames)
with HTTPError(httplib.NOT_FOUND, TypeError, AssertionError):
directory = self.searcher.path
assert name in commit.fileNames, 'file not referenced in commit'
return cherrypy.lib.static.serve_download(os.path.join(directory, name))
@cherrypy.expose
@cherrypy.tools.allow(paths=[('GET', 'POST'), ('GET',), ('GET', 'PUT', 'DELETE', 'PATCH')])
[docs] def docs(self, name=None, value='', **options):
"""Add or return documents. See :meth:`WebSearcher.docs` for GET method.
**POST** /docs
Add documents to index.
[{*string*: *string*\|\ *number*\|\ *array*,... },... ]
**PUT, DELETE** /docs/*chars*/*chars*
Set or delete document. Unique term should be indexed and is added to the new document.
{*string*: *string*\|\ *number*\|\ *array*,... }
"""
request = cherrypy.serving.request
if request.method in ('GET', 'HEAD'):
return WebSearcher.docs(self, name, value, **options)
if request.method == 'DELETE':
self.indexer.delete(name, value)
elif request.method == 'POST':
for doc in getattr(request, 'json', ()):
self.indexer.add(doc)
else:
doc = getattr(request, 'json', {})
with HTTPError(httplib.CONFLICT, KeyError, AssertionError):
assert self.indexer.fields[name].indexed(), 'unique field must be indexed'
if request.method == 'PUT':
with HTTPError(httplib.BAD_REQUEST, AssertionError):
assert doc.setdefault(name, value) == value, 'multiple values for unique field'
else:
with HTTPError(httplib.CONFLICT, KeyError, AssertionError):
assert all(self.indexer.fields[name].docValueType() for name in doc)
self.indexer.update(name, value, doc)
self.refresh()
docs._cp_config.update(WebSearcher.docs._cp_config)
docs._cp_config['request.methods_with_bodies'] = ('POST', 'PUT', 'PATCH')
@cherrypy.expose
@cherrypy.tools.allow(methods=['GET', 'DELETE'])
[docs] def search(self, q=None, **options):
"""Run or delete a query. See :meth:`WebSearcher.search` for GET method.
**DELETE** /search?q=\ *chars*
Delete documents which match query.
"""
if cherrypy.request.method != 'DELETE':
return WebSearcher.search(self, q, **options)
if q is None:
self.indexer.deleteAll()
else:
self.indexer.delete(parse.q(self.searcher, q, **options))
self.refresh()
search._cp_config.update(WebSearcher.search._cp_config)
@cherrypy.expose
@cherrypy.tools.json_in(process_body=dict)
@cherrypy.tools.allow(paths=[('GET',), ('GET', 'PUT')])
@cherrypy.tools.validate(on=False)
[docs] def fields(self, name='', **settings):
"""Return or store a field's settings.
**GET** /fields
Return known field names.
:return: [*string*,... ]
**GET, PUT** /fields/*chars*
Set and return settings for given field name.
{"stored"|"indexed"\|...: *string*\|true|false,... }
.. versionchanged:: 1.6 lucene FieldType attributes used as settings
:return: {"stored"|"indexed"\|...: *string*\|true|false,... }
"""
if not name:
return sorted(self.indexer.fields)
if cherrypy.request.method == 'PUT':
if name not in self.indexer.fields:
cherrypy.response.status = httplib.CREATED
with HTTPError(httplib.BAD_REQUEST, AttributeError):
self.indexer.set(name, **settings)
with HTTPError(httplib.NOT_FOUND, KeyError):
return self.indexer.fields[name].settings
def init(vmargs='-Xrs,-Djava.awt.headless=true', **kwargs):
"Callback to initialize VM and app roots after daemonizing."
if vmargs:
kwargs['vmargs'] = vmargs
lucene.initVM(**kwargs)
for app in cherrypy.tree.apps.values():
if isinstance(app.root, WebSearcher):
app.root.__init__(*app.root.__dict__.pop('args'), **app.root.__dict__.pop('kwargs'))
[docs]def mount(root, path='', config=None, autoupdate=0, app=None):
"""Attach root and subscribe to plugins.
:param root,path,config: see cherrypy.tree.mount
:param autoupdate: see command-line options
:param app: optionally replace root on existing app
"""
if app is None:
app = cherrypy.tree.mount(root, path, config)
else:
cherrypy.engine.unsubscribe('stop', app.root.close)
if hasattr(app.root, 'monitor'):
app.root.monitor.unsubscribe()
app.root = root
cherrypy.engine.subscribe('stop', root.close)
if autoupdate:
root.monitor = AttachedMonitor(cherrypy.engine, root.update, autoupdate)
root.monitor.subscribe()
return app
[docs]def start(root=None, path='', config=None, pidfile='', daemonize=False, autoreload=0, autoupdate=0, callback=None):
"""Attach root, subscribe to plugins, and start server.
:param root,path,config: see cherrypy.quickstart
:param pidfile,daemonize,autoreload,autoupdate: see command-line options
:param callback: optional callback function scheduled after daemonizing
"""
cherrypy.engine.subscribe('start_thread', attach_thread)
cherrypy.config['engine.autoreload.on'] = False
if pidfile:
cherrypy.process.plugins.PIDFile(cherrypy.engine, os.path.abspath(pidfile)).subscribe()
if daemonize:
cherrypy.config['log.screen'] = False
cherrypy.process.plugins.Daemonizer(cherrypy.engine).subscribe()
if autoreload:
Autoreloader(cherrypy.engine, autoreload).subscribe()
if callback:
priority = (cherrypy.process.plugins.Daemonizer.start.priority + cherrypy.process.plugins.Monitor.start.priority) // 2
cherrypy.engine.subscribe('start', callback, priority)
if root is not None:
mount(root, path, config, autoupdate)
cherrypy.quickstart(cherrypy.tree.apps.get(path), path, config)
parser = argparse.ArgumentParser(description='Restful json cherrypy server.', prog='lupyne.server')
parser.add_argument('directories', nargs='*', metavar='directory', help='index directories')
parser.add_argument('-r', '--read-only', action='store_true', help='expose only read methods; no write lock')
parser.add_argument('-c', '--config', help='optional configuration file or json object of global params')
parser.add_argument('-p', '--pidfile', metavar='FILE', help='store the process id in the given file')
parser.add_argument('-d', '--daemonize', action='store_true', help='run the server as a daemon')
parser.add_argument('--autoreload', type=float, metavar='SECONDS', help='automatically reload modules; replacement for engine.autoreload')
parser.add_argument('--autoupdate', type=float, metavar='SECONDS', help='automatically update index version and commit any changes')
parser.add_argument('--autosync', metavar='HOST{:PORT}{/PATH},...', help='automatically synchronize searcher with remote hosts and update')
parser.add_argument('--real-time', action='store_true', help='search in real-time without committing')
if __name__ == '__main__':
args = parser.parse_args()
read_only = args.read_only or args.autosync or len(args.directories) > 1
kwargs = {'nrt': True} if args.real_time else {}
if read_only and (args.real_time or not args.directories):
parser.error('incompatible read/write options')
if args.autosync:
kwargs['hosts'] = args.autosync.split(',')
if not (args.autoupdate and len(args.directories) == 1):
parser.error('autosync requires autoupdate and a single directory')
if args.config and not os.path.exists(args.config):
args.config = {'global': json.loads(args.config)}
cls = WebSearcher if read_only else WebIndexer
root = cls.new(*map(os.path.abspath, args.directories), **kwargs)
del args.directories, args.read_only, args.autosync, args.real_time
start(root, callback=init, **args.__dict__)