root/tags/amplee-0.5.0/amplee/indexer/__init__.py

Revision 443, 18.0 kB (checked in by sylvain, 1 year ago)

Added a make_feed method to the Service class allowing to quickly construct atom feeds from a mapping of member ids (such as returned by the indexer), added a search box based on that feature to the cooker example.

Line 
1 # -*- coding: utf-8 -*-
2
3 __doc__ = """Really simple indexing implementation for amplee
4
5 I assume something bigger such as Lucene would be much more efficient.
6
7 For example:
8
9 >>> import time
10 >>> from amplee.indexer import *
11 >>> ind = Indexer()
12 >>> container = ShelveContainer('/tmp/cache.p')
13 >>> pi = PublishedIndex('pi', container=container, granularity=DateIndex.month)
14 >>> container2 = ShelveContainer('/tmp/cache2.p')
15 >>> ai = AuthorIndex('ai', container=container2)
16 >>> ind.register(pi)
17 >>> ind.register(ai)
18
19 # With the indexer daemon
20 >>> ind.start_daemon(interval=1.0)
21 >>> try:
22 >>>     index(member)
23 >>>     time.sleep(3)
24 >>> finally:
25 >>>     ind.stop_daemon()
26
27 # Or without
28 >>> ind.process(member)
29
30 >>> from datetime import datetime
31 >>> start = datetime(2006, 10, 8, 14, 00, 12)
32 >>> end = datetime(2008, 10, 8, 16, 00, 12)
33 >>> r0 = pi.between(start, end)
34 >>> print r0
35 >>> r1 = ai.lookup('Jon Doe')
36 >>> print r1
37
38 # You can even concatenate both set of results into one
39 >>> print r0 & r1
40
41 # Or substract (assuming r2)
42 >>> print r0 & r2 - r1
43
44 Once you get a result dictionnary you can do something like this:
45
46 >>> collection_name, member_id = r.pop()
47 >>> c0 = service.get_collection(collection_name)
48 >>> member = c0.get_member(member_id)
49
50 Or to get a list of member instances.
51
52 >>> items = ind.to_dict(r)
53 >>> for collection_name in items:
54         c = service.get_collection(collection_name)
55         members = c.reload_members_from_list(items[collection_name])
56 """
57
58 from Queue import Queue, Empty
59 import sha
60 import threading
61 from datetime import datetime
62 import re
63 import shelve
64 from sets import Set
65
66 from amplee.utils import parse_isodate
67
68 __all__ = ['Indexer', 'BaseIndex', 'PublishedIndex', 'AuthorIndex',
69            'CategoryIndex', 'UpdatedIndex', 'DateIndex', 'MemoryContainer',
70            'index', 'ShelveContainer', 'MemcacheContainer',
71            'KeywordIndex', 'AtomIDIndexer']
72
73 _queue = Queue()
74
75 def index(member):
76     """Add the member to queue of elements to be indexed.
77
78     The ``member`` is an instance of any of the member classes
79     provided by amplee.
80
81     Note that qe-queuing will be asynchronized and thus this
82     function will return before the member is actually indexed.
83     """
84     _queue.put(member)
85
86 class MemoryContainer(object):
87     """Dummy memory container based on the dictionnary.
88     This does not do thread locking so be careful
89     of what you do with it :)
90     """
91     def __init__(self):
92         self._container = {}
93
94     def shutdown(self):
95         pass
96
97     def __contains__(self, key):
98         return key in self._container
99        
100     def __getitem__(self, key):
101         if key in self._container:
102             return self._container[key]
103         raise KeyError
104    
105     def __setitem__(self, key, value):
106         self._container[key] = value
107
108     def __delitem__(self, key):
109         if key in self._container:
110             del self._container[key]
111
112     def __iter__(self):
113         return iter(self._container)
114    
115     def iterkeys(self):
116         return self._container.iterkeys()
117    
118 class ShelveContainer(object):
119     def __init__(self, abs_path, protocol=2):
120         """Simple container around the built-in shelve module.
121         This provides a simple and more-or-less efficient
122         persistence system.
123
124         The ``abs_path`` is the absolute path of the shelve to
125         open (and create if it does not exist).
126
127         The ``protocol`` is the protocol value used by the open()
128         function of the shelve module.
129
130         If you plan to create your own container, they must
131         implement the start and shutdown methods as-is.
132         """
133         self.abs_path = abs_path
134         self.protocol = protocol
135         self.shelf = shelve.open(self.abs_path, protocol=self.protocol,
136                                  writeback=True)
137
138     def shutdown(self):
139         """Closes the shelve container when not needed anymore."""
140         self.shelf.close()
141
142     def __contains__(self, key):
143         return key in self.shelf
144        
145     def __getitem__(self, key):
146         if key in self.shelf:
147             return self.shelf[key]
148         raise KeyError
149    
150     def __setitem__(self, key, value):
151         self.shelf[key] = value
152         self.shelf.sync()
153
154     def __delitem__(self, key):
155         if key in self.shelf:
156             del self.shelf[key]
157         self.shelf.sync()
158
159     def __iter__(self):
160         return iter(self.shelf)
161    
162     def iterkeys(self):
163         return self.shelf.iterkeys()
164    
165 class MemcacheContainer(object):
166     def __init__(self, servers, cache_key=None):
167         """
168         Container based on the memcached tool for highly
169         efficient and distributed memory cache.
170
171         To use this container you must have either
172         cmemcache or python-memcache installed.
173
174         The ``servers`` arguments is a list of bye string
175         (not unicode) respecting the format of memcached
176         connection strings.
177
178         The ``cache_key`` is a unique value that will be used
179         to track the indexes within memcache. If not
180         provided one random string will created.
181
182         Basically only one key will be created within memcache
183         and a list will be associated as a value.
184
185         Indexes will be stored in that list.
186         """
187         try:
188             from cmemcache import Client
189         except ImportError:
190             from memcache import Client
191
192         if not cache_key:
193             import string, random
194             CHARS = string.letters + string.octdigits + '_-'
195             cache_key = ''.join(random.sample(CHARS, 25))
196            
197         self.cache_key = cache_key
198         self.mc = Client(servers)
199         self.mc.set(self.cache_key, {})
200
201     def shutdown(self):
202         """Closes down all connections to memcached nodes."""
203         self.mc.disconnect_all()
204
205     def __contains__(self, key):
206         l = self.mc.get(self.cache_key)
207         return key in l
208        
209     def __getitem__(self, key):
210         l = self.mc.get(self.cache_key)
211         if key in l:
212             return l[key]
213         raise KeyError
214    
215     def __setitem__(self, key, value):
216         l = self.mc.get(self.cache_key)
217         l[key] = value
218         self.mc.set(key, l)
219
220     def __delitem__(self, key):
221         l = self.mc.get(self.cache_key)
222         if key in l:
223             del l[key]
224         self.mc.set(key, l)
225
226     def __iter__(self):
227         l = self.mc.get(self.cache_key)
228         return l.keys()
229    
230     def iterkeys(self):
231         l = self.mc.get(self.cache_key)
232         return l.keys()
233
234 # Stolen from CherryPy
235 class CyclicTimer(threading._Timer):
236     """A thread timer that runs untl it is explicitely stopped.
237     """
238     def run(self):
239         while True:
240             self.finished.wait(self.interval)
241             if self.finished.isSet():
242                 break
243             self.function(*self.args, **self.kwargs)
244
245 class Indexer(object):
246     def __init__(self, batch=5):
247         """This class is the manager of your index handlers.
248         You register your index handlers and then you start the
249         indexer daemon which will run at regular interval, polling
250         from the global queue members to index.
251
252         If you have your own even mechanism you don't need to
253         start the daemon and can simply call `apply_all` to
254         perform the same job at will.
255
256         The ``batch`` parameter indicates how many members
257         to dequeue during one iteration of indexing.
258         """
259         self.indexes = {}
260         self.index_timer = None
261         self.batch = batch
262
263     def register(self, index):
264         """Registers a new index handler."""
265         self.indexes[index.name] = index
266
267     def unregister(self, index):
268         """Removes an index handler"""
269         if index.name in self.indexes:
270             del self.indexes[index.name]
271
272     def retrieve(self, name):
273         """Returns the index handler per its name"""
274         return self.indexes[name]
275
276     def shutdown(self):
277         """Shutdown all the associated index container"""
278         for name in self.indexes:
279             self.indexes[name].container.shutdown()
280
281     def start_daemon(self, interval=60.0):
282         """Convenient way to start a daemon timer that will
283         process the queue of indexable members.
284
285         Note that if you try to start it before it was stopped
286         an error will be raised.
287         """
288         if self.index_timer is not None:
289             raise RuntimeError, "Index daemon already started"
290        
291         t = CyclicTimer(interval, self.apply_all)
292         t.setName('Amplee indexer')
293         self.index_timer = t
294         self.index_timer.start()
295
296     def stop_daemon(self):
297         """Stops the daemon timer"""
298         self.shutdown()
299         if self.index_timer is not None:
300             self.index_timer.cancel()
301             self.index_timer.join()
302             self.index_timer = None
303
304     def process(self, member):
305         """Indexes the provided member"""
306         for name in self.indexes:
307             self.indexes[name].update(member)
308            
309     def apply_all(self):
310         """Unqueue up to `self.batch` number
311         of members from the global queue and apply
312         all the index handlers.
313
314         A handler only needs to be a class that implements
315         a method named `update` and taking the `member` as its
316         unique parameter.
317         """
318         for unused in xrange(0, self.batch):
319             try:
320                 member = _queue.get_nowait()
321             except Empty:
322                 return
323             for name in self.indexes:
324                 self.indexes[name].update(member)
325
326     def to_dict(self, result):
327         """Returns teh given Set as a dictionnary of the form:
328
329         {collection_name: [member_ids]}
330         """
331         members = {}
332         for (collection_name, member_id) in result:
333             if collection_name not in members:
334                 members[collection_name] = []
335             members[collection_name].append(member_id)
336
337         return members
338    
339 class BaseIndex(object):
340     def __init__(self, name, container=None):
341         """
342         Base class of your index handler.
343
344         Built-in index handlers will always inherit from
345         this class but your own handlers don't need to
346         as long as they implement a method named `update`
347         and taking the `member` as its unique parameter.
348
349         The ``name`` parameter is an internal identifier.
350
351         The ``container`` is an instance of object
352         implementing the dictionnary interface.
353         """
354         self.name = name
355         self.container = container
356
357     def update(self, member):
358         raise NotImplemented
359
360     def store(self, key, value):
361         """Stores a value within the index container
362
363         The ``key`` and ``value`` can be any object
364         as long as the container can cope with it.
365         """
366         if not key in self.container:
367             self.container[key] = Set([value])
368         else:
369             s = self.container[key]
370             s.add(value)
371             self.container[key] = s
372
373     def load(self, key):
374         """Returns the value associated with the key as a set of one
375         element."""
376         if key in self.container:
377             return self.container[key]
378
379         return Set()
380
381     def iterindex(self, func):
382         """Iterates through the keys of the
383         container and apply for each one the provided ``func``
384         with the key and the data associated.
385
386         If you want to stop the iteration you can
387         simply raise StopIteration from the function.
388
389         The method returns a list of results provided
390         by the function calls.
391         """
392         container = None
393         result = Set()
394         try:
395             for key in self.container.iterkeys():
396                 value = func(key, self.container[key])
397                 if isinstance(value, Set):
398                     result.update(value)
399                 elif isinstance(value, list):
400                     result.update(Set(value))
401                 elif value is not None:
402                     result.add(value)
403         except StopIteration:
404             pass
405
406         return result
407
408     def keys(self):
409         """Return all the existing keys in the container as a list.
410         Not all containers support that features and should therefore
411         be used carefully.
412         """
413         return [key for key in self.container.iterkeys()]
414
415 class DateIndex(BaseIndex):
416     def __init__(self, name, target, container=None, granularity=None):
417         """Base class for date based inde handlers.
418
419         The ``target`` is the name of the element within the atom entry
420         to search for.
421
422         The ``granularity`` is one of the classmethods of this class
423         that indicates what will be the granularity of the key of the index.
424
425         The lower the finer but also the more keys you will have. It
426         defaults to ``DateIndex.hour``.
427         """
428         BaseIndex.__init__(self, name, container)
429         if not granularity:
430             granularity = DateIndex.hour
431         self.granularity = granularity
432         self.target = target
433        
434     def update(self, member):
435         entry = member.atom
436         published = entry.get_child(self.target, entry.xml_ns)
437         if published:
438             pub = self.granularity(parse_isodate(published.xml_text))
439             self.store(str(pub),  (member.collection.name_or_id, member.member_id))
440            
441     def between(self, start, end):
442         """Returns a dictionnary of the forum {collection_name: [member_id]}
443         which have been indexed between the two provided dates.
444         """
445         def _between(key, data):
446             dt = parse_isodate(key)
447             if start <= dt <= end:
448                 return data
449
450         return self.iterindex(_between)
451
452     def year(cls, dt):
453         return datetime(dt.year, 1, 1, 0, 0)
454     year = classmethod(year)
455    
456     def month(cls, dt):
457         return datetime(dt.year, dt.month, 1, 0, 0)
458     month = classmethod(month)
459    
460     def day(cls, dt):
461         return datetime(dt.year, dt.month, dt.day, 0, 0)
462     year = classmethod(day)
463    
464     def hour(cls, dt):
465         return datetime(dt.year, dt.month, dt.day, dt.hour, 0)
466     hour = classmethod(hour)
467    
468     def minute(cls, dt):
469         return datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute)
470     minute = classmethod(minute)
471            
472 class PublishedIndex(DateIndex):
473     def __init__(self, name, container=None, granularity=None):
474         DateIndex.__init__(self, name, 'published', container, granularity)
475    
476 class UpdatedIndex(DateIndex):
477     def __init__(self, name, container=None, granularity=None):
478         DateIndex.__init__(self, name, 'updated', container, granularity)
479    
480 class AuthorIndex(BaseIndex):
481     def __init__(self, name, container=None, index_name=True, index_uri=False, index_email=False):
482         """Simple author indexing.
483
484         This class will create a sha hash value of the concatenation of
485         the name, uri and email if the three have been enabled.
486         """
487         BaseIndex.__init__(self, name, container)
488         self.index_name = index_name
489         self.index_uri = index_uri
490         self.index_email = index_email
491
492     def update(self, member):
493         entry = member.atom
494         authors = entry.get_children('author', entry.xml_ns)
495         if authors:
496             for author in authors:
497                 name = uri = email = ''
498                 if self.index_name:
499                     name = author.get_child('name', author.xml_ns)
500                 if self.index_uri:
501                     uri = author.get_child('uri', author.xml_ns)
502                 if self.index_email:
503                     email = author.get_child('email', author.xml_ns)
504
505                 hashed_key = sha.new('%s%s%s' % (name, uri, email)).hexdigest()
506                 self.store(hashed_key,  (member.collection.name_or_id, member.member_id))
507            
508     def lookup(self, name='', uri='', email=''):
509         hashed_key = sha.new('%s%s%s' % (name, uri, email)).hexdigest()
510         return self.load(hashed_key)
511
512 class CategoryIndex(BaseIndex):
513     def __init__(self, name, container=None, index_term=True, index_scheme=False):
514         """Simple category indexing.
515         """
516         BaseIndex.__init__(self, name, container)
517         self.index_term = index_term
518         self.index_scheme = index_scheme
519
520     def update(self, member):
521         entry = member.atom
522         categories = entry.get_children('category', entry.xml_ns)
523         if categories:
524             for category in categories:
525                 term = scheme = ''
526                 if self.index_term:
527                     term = category.get_attribute('term')
528                 if self.index_scheme:
529                     scheme = category.get_attribute('scheme')
530
531                 hashed_key = sha.new('%s%s' % (term, scheme)).hexdigest()
532                 self.store(hashed_key, (member.collection.name_or_id, member.member_id))
533            
534     def lookup(self, term='', scheme=''):
535         hashed_key = sha.new('%s%s' % (term, scheme)).hexdigest()
536         return self.load(hashed_key)
537
538 class KeywordIndex(BaseIndex):
539     def __init__(self, name, container=None, keywords=None):
540         """Simple keyword indexing on the content element
541         """
542         BaseIndex.__init__(self, name, container)
543         self.keywords = keywords
544         self._r = re.compile('|'.join(self.keywords))
545
546     def update(self, member):
547         entry = member.atom
548         content = entry.get_child('content', entry.xml_ns)
549         if content:
550             content_type = content.get_attribute('type')
551             if unicode(content_type) == u'text':
552                 match = self._r.search(content.xml_text)
553                 if match is not None:
554                     for keyword in self.keywords:
555                         self.store(keyword, (member.collection.name_or_id, member.member_id))
556            
557     def contains(self, keyword):
558         return self.load(keyword)
559
560 class AtomIDIndexer(BaseIndex):
561     def __init__(self, name, container=None):
562         """Indexing atom:id values"""
563         BaseIndex.__init__(self, name, container)
564
565     def update(self, member):
566         entry = member.atom
567         entry_id = entry.get_child('id', entry.xml_ns)
568         if entry_id:
569             key = entry_id.xml_text.encode('utf-8')
570             self.store(key, (member.collection.name_or_id, member.member_id))
571        
572     def lookup(self, entry_id):
573         return self.load(entry_id)
Note: See TracBrowser for help on using the browser.