#!/usr/bin/env python #Copyright 2005,2006,2007 Sebastian Hagen # This file is part of eucharis. # eucharis is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 # as published by the Free Software Foundation # eucharis is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with eucharis; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import os import socket import thread import threading import popen2 import logging import socket_management logger = logging.getLogger('asynchronous_processing') class child_execute: def __init__(self, command, callback_target, callback_args=(), callback_kwargs={}): if (len(command_string) <= 0): raise ValueError('Insufficient length of command argument (expected at least 1 element)') self.command = command self.callback_target = callback_target self.callback_args = callback_args self.callback_kwargs = callback_kwargs self.thread = threading.Thread(group=None, target=self.run_command, name=None, args=(), kwargs={}) self.thread.setDaemon(True) self.thread.start() def run_command(self): """Spawns the process, and after it's exit returns the result by interrupting a currently blocking select.select() call after adding an expired timer. callback syntax (all arguments use keywords): callback(child_stdout, child_stderr, child_return_value, callback_data) """ self.child = popen2.Popen3(self.command, True) self.child_stdout = self.child.fromchild.read() self.child_stderr = self.child.childerr.read() self.child_return_value = self.child.wait() socket_management.timers_add(delay=-1000, callback_handler=self.callback_code, args=self.callback_args, callback_kwargs=self.callback_kwargs, parent=None) socket_management.pipe_notify.notify() class dns_lookup_generic: def __init__(self, request, callback_target, callback_args=(), callback_kwargs={}): if (type(request) != str): raise TypeError('Got hostname argument %s of invalid type %s (exepected str).' % (self.hostname, type(self.hostname))) if (len(request) <= 0): raise ValueError('Got invalid hostname argument %s (expected at least one character).' % (self.hostname,)) self.callback_target = callback_target self.callback_args = callback_args self.callback_kwargs = callback_kwargs self.thread = threading.Thread(group=None, target=self.perform_task, name=None, args=(request,), kwargs={}) self.thread.setDaemon(True) self.thread.start() def perform_task(self, request_data): """Performs the lookup, and adds an expired timer to return the result and interrupts a currently blocking select.select()-call on success. callback-syntax: callback(**callback_args, hostname=hostname, address=address, **callback_data) """ try: response_data = self.lookup_data(request_data) except socket.error: logger.log(20, 'Unable to perform lookup. request_data: %s callback_target: %s callback_args: %s callback_kwargs: %s Error:' % (request_data, self.callback_target, self.callback_args, self.callback_kwargs), exc_info=True) else: self.callback_kwargs.update({'request': request_data, 'response':response_data}) socket_management.timers_add(delay=-10000, callback_handler=self.callback_target, args=self.callback_args, kwargs=self.callback_kwargs, parent=None) socket_management.pipe_notify.notify() class gethostbyname(dns_lookup_generic): def lookup_data(self, request): #This should be ipv6 compatible. return socket.getaddrinfo(request, None)[0][-1][0] class gethostbyaddr(dns_lookup_generic): def lookup_data(self, request): return socket.gethostbyaddr(request)