root/oss/bucker/scripts/queue-server.py

Revision 673, 5.1 kB (checked in by sylvain, 2 years ago)

Updated to CherryPy? 3.1

Line 
1 #!/usr/bin/env python2.5
2 # -*- coding: utf-8 -*-
3 __doc__ = """
4 This script starts a queue server associating it with one or several memcached servers.
5
6 * Lookup the command line arguments:
7   python queue-server.py --help 
8
9 * Start the server as follow:
10   python queue-server.py --servers 127.0.0.1:11211
11
12 The --port parameter defines on which port the server will be listening. The default value is 9876.
13
14 * Stop the server as follow:
15   kill -USR1 pid
16   kill -15 pid
17
18 Where pid is the process identifier.
19 The first SIGUSR1 signal shutdowns client connections. The SIGTERM signal shutdowns the server.
20
21 If the server isn ot closed that way, it may have unexpected behavior. Notably the KeyboardInterrupt exception will not cleanly shutdown the server.
22 """
23 #import gc
24 #gc.set_debug(gc.DEBUG_LEAK)
25
26 import sys
27 import socket
28
29 from cherrypy.process.wspbus import Bus
30 from cherrypy.process import plugins, servers
31
32 from Kamaelia.Chassis.ConnectedServer import SimpleServer
33
34 from bucker.provider.memcached import MemcachedQueue, MemcachedQueueProtocol
35 from bucker.lib.logger import Logger
36
37 bus = Bus()
38
39 def parse_commandline():
40     from optparse import OptionParser
41     parser = OptionParser()
42     parser.add_option("-s", "--servers", dest="servers",
43                       help="comma separated lists of IP:PORT of memcached servers (default=127.0.0.1:11211)")
44     parser.set_defaults(servers='127.0.0.1:11211')
45     parser.add_option("-p", "--port", dest="port", action="store",
46                        type="int", help="queue server port (default=9876)")
47     parser.set_defaults(port=9876)
48     parser.add_option("-q", "--queue", dest="queue",
49                       help="queue identifier (default=None)", action="store")
50     parser.set_defaults(queue=None)
51     parser.add_option("-t", "--visibility-timeout", type="int",
52                       action="store", dest="visibility",
53                       help="sets the visibility timeout in seconds for each message when accessed (default=30)")
54     parser.set_defaults(visibility=30)
55     parser.add_option("-l", "--logger-path", dest="logger_path", action="store",
56                       help="absolute path to the log file (default=None)")
57     parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
58                       help="log every messages to stdout")
59     parser.add_option("-i", "--pid-file", dest="pidfile", action="store",
60                       help="if set, it must be the path of the file that will contain the PID of the process (default=None)")
61     parser.set_defaults(pidfile=None)
62     parser.add_option("-d", "--daemon", dest="daemon", action="store_true",
63                       help="run server as daemon [UNSUPPORTED FOR NOW]")
64     parser.set_defaults(logger_path=None)
65     (options, args) = parser.parse_args()
66
67     return options
68
69 class Server(object):
70     def __init__(self):
71         self.options = parse_commandline()
72
73         servers = self.options.servers.split(',')
74         self.queue = MemcachedQueue(servers=servers,
75                                     visibility_timeout=int(self.options.visibility),
76                                     queue_list_name=self.options.queue)
77        
78         self.logger = None
79         if self.options.logger_path or self.options.verbose:
80             self.logger = Logger(self.options.logger_path,
81                                  self.options.verbose)
82             self.queue.set_logger(self.logger)
83            
84         MemcachedQueue.setService(self.queue)
85
86     def start(self):
87         self.queue.activate()
88
89         def make_protocol():
90             mqp = MemcachedQueueProtocol()
91             if self.logger:
92                 mqp.set_logger(self.logger)
93             return mqp
94
95         self.server = SimpleServer(protocol=make_protocol,
96                                    port=self.options.port,
97                                    socketOptions=(socket.SOL_SOCKET,
98                                                   socket.SO_REUSEADDR, 1))
99         bus.log("Queue server running on port %d" % self.options.port)
100         self.server.activate()
101     start.priority = 90
102
103     def stop(self):
104         from Kamaelia.Util.OneShot import OneShot
105
106         from Kamaelia.IPC import serverShutdown
107         o = OneShot(msg=serverShutdown())
108         self.server.link((o, 'outbox'), (self.server, 'control'))
109         o.activate()
110
111         from Axon.Ipc import shutdownMicroprocess
112         service, shutdownservice, queue = MemcachedQueue.getService()
113         o = OneShot(msg=shutdownMicroprocess())
114         service[0].link((o, 'outbox'), shutdownservice)
115         o.activate()
116
117     def exit(self):
118         self.server.stop()
119
120 def setup(server):
121     plugins.SignalHandler(bus)
122     if server.options.pidfile != None:
123         plugins.PIDFile(bus, server.options.pidfile)
124     bus.subscribe('start', server.start)
125     bus.subscribe('graceful', server.stop)
126     bus.subscribe('exit', server.exit)
127
128     if 'win' not in sys.platform and server.options.daemon:
129         bus.subscribe('start', restsrv.plugins.daemonize)
130
131 def serve():
132     bus.start()
133     #bus.block()
134     from Axon.Scheduler import scheduler
135     scheduler.run.runThreads(slowmo=0.1)
136
137 if __name__ == '__main__':
138     s = Server()
139     setup(s)
140     serve()
Note: See TracBrowser for help on using the browser.