# http://www.devshed.com/c/a/Python/Basic-Threading-in-Python/2/ import Queue import socket import threading import string as sg # *********** Turn simulation on here *********** SIMULATE = True #SIMULATE = False if not SIMULATE : pass #HOST = 'polaris.cv.nrao.edu' HOST = socket.gethostname() print 'host name: %s' % HOST ASTRID_PORT = 50005 DAQ_PORT = 50011 daq_list = ['paf1'] #daq_list = ['paf1', 'paf2', 'paf3', 'paf4', 'paf5' # 'paf6', 'paf7', 'paf8', 'paf9', 'paf10'] class ServerThread ( threading.Thread ): def __init__ ( self, parent ) : self.parent = parent threading.Thread.__init__ ( self ) def run ( self ): while True: client = self.parent.serverPool.get() if client != None: # print 'Received connection:', client[1][0] # client[0].send ( 'xxx' ) msg = client[0].recv(1024) print 'received: %s' % msg client[0].send('OK') self.parent.process_message(msg, client[0]) client[0].close() class PCI9812server : def __init__ ( self ) : self.skt_mgr = SocketManager(HOST, DAQ_PORT) self.buffer = '' if not SIMULATE : self.default_setup() self.serverPool = Queue.Queue(0) for x in xrange ( 2 ): ServerThread(self).start() server = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) server.bind((HOST, ASTRID_PORT)) server.listen(5) while True: # print 'RESTART' self.serverPool.put(server.accept()) def default_setup ( self ) : print '--- Default setup here ---' def process_message ( self, message, client ) : print 'msg:', message self.buffer, commands = \ self.skt_mgr.buffer_manager(self.buffer, message) for cmd in commands : words = cmd.split('(') cmd_name = words[0].strip() args = words[1].split(')')[0].split(',') print '+++', cmd_name, args if cmd_name == 'mkdir' : self.make_directory(args) elif cmd_name == 'init_adc' : self.init_adc(args) elif cmd_name == 'arm_adc' : self.arm_adc(args) elif cmd_name == 'close_adc' : self.close_adc() else : self.send_error('Unknown 9812 command: %s' % cmd, client) def make_directory ( self, args ) : dir_name = args[0].strip() msg = 'daq:mkdir %s)' % dir_name print 'paf message:', msg self.skt_mgr.send_paf_messages(msg) def init_adc ( self, args ) : msg = 'daq:adc' sample_rate = args[0].strip() msg += ' sample_rate=%s' % sample_rate channel = args[1].strip() msg += ' channel=%s' % channel read_count = args[2].strip() msg += ' read_count=%s' % read_count adc_res = args[3].strip() msg += ' adc_res=%s' % adc_res trigger_src = args[4].strip() msg += ' trigger_src=%s' % trigger_src trigger_mode = args[5].strip() msg += ' trigger_mode=%s' % trigger_mode trigger_pol = args[6].strip() msg += ' trigger_pol=%s' % trigger_pol trigger_lvl = args[7].strip() msg += ' trigger_lvl=%s' % trigger_lvl clksel_src = args[8].strip() msg += ' clksel_src=%s' % clksel_src clksel_ad = args[9].strip() msg += ' clksel_ad=%s' % clksel_ad dbl_buf = args[10].strip() msg += ' dbl_buf=%s' % dbl_buf v_range = args[11].strip() msg += ' v_range=%s)' % v_range self.skt_mgr.send_paf_messages(msg) def arm_adc ( self, args ) : file_name = args[0].strip() duration = args[1].strip() msg = 'daq:paf start fname=%s duration=%s)' % (file_name, duration) self.skt_mgr.send_paf_messages(msg) def close_adc ( self ) : msg = 'daq:end)' self.skt_mgr.send_end_messages(msg) class SocketManager : def __init__ ( self, host, base_port ) : self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.skt.bind((host, base_port)) #socket.setdefaulttimeout(1.0) print '9812_server(listen): waiting for socket connections.' self.skt.listen(len(daq_list)) self.skt_dict = {} for i in range(len(daq_list) ) : buffer = '' conn, addr = self.skt.accept() message = conn.recv(1024) buffer, commands = self.buffer_manager(buffer, message) for cmd in commands : words = cmd.split('(') if words[0] == 'client' : client_name = words[1].split(')')[0].strip() print '9812_server connected by: "%s"' % client_name self.skt_dict[client_name] = {'conn':conn, 'addr':addr, 'buffer':buffer, 'status':True} conn.settimeout(0.5) else : print 'Opening command ignored: %s' % cmd # This method assumes that commands from all clients are unique def check_for_messages ( self ) : all_commands = [] for client_name in self.skt_dict.iterkeys() : sub_dict = self.skt_dict[client_name] if not sub_dict['status'] : continue try : message = sub_dict['conn'].recv(1024) except socket.timeout : print '!!! check_for_messages() timeout !!!!' pass else : sub_dict['buffer'], commands = \ self.buffer_manager(sub_dict['buffer'], message) commands = self.trap_socket_closure(commands) all_commands.extend(commands) return all_commands # This method only checks one client def check_for_one_message ( self, client_name ) : sub_dict = self.skt_dict[client_name] if not sub_dict['status'] : print '**************' return '' try : print 'look for message' message = sub_dict['conn'].recv(1024) except socket.timeout : print '!!! check_for_one_message() timeout !!!!' pass else : print '^^^', message, '^^^' return message def buffer_manager ( self, buffer, msg ) : commands = [] buffer += msg delim_pos = buffer.find(')') while delim_pos >= 0 : commands.append(buffer[:(delim_pos + 1)]) if len(buffer) == (delim_pos + 1) : buffer = '' delim_pos = -1 else : buffer = buffer[(delim_pos + 1):] delim_pos = buffer.find(')') return buffer, commands def send_paf_messages ( self, message ) : for client_name in self.skt_dict.iterkeys() : # print '++++ client_name:', client_name sub_dict = self.skt_dict[client_name] if sub_dict['status'] : # print '---- send:', message sub_dict['conn'].send(message) if self.check_for_one_message(client_name) != '1' : print 'Error processing (%s): %s' % (client_name, message) else : print '"%s" socket closed, message (%s) not sent' % \ (client_name, message) def send_end_messages ( self, message ) : for client_name in self.skt_dict.iterkeys() : sub_dict = self.skt_dict[client_name] if sub_dict['status'] : sub_dict['conn'].send(message) else : print '"%s" socket closed, message (%s) not sent' % \ (client_name, message) def trap_socket_closure ( self, commands ) : new_commands = [] for cmd in commands : words = cmd.split('(') if words[0].strip() == 'closing_skt' : print 'Closing %s socket' % (words[1].split(')')[0]) else : new_commands.append(cmd) return new_commands daq = PCI9812server()