#!/usr/bin/env python #Copyright 2004,2006 Sebastian Hagen # This file is part of eucharis. # eucharis is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 # as published by the Free Software Foundation # eucharis is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with eucharis; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # Very partial domain name service server implementation. import socket_management import socket import struct import logging from socket_management import fd_wrap class_defaults = {} class_defaults['dns_server'] = {} constants = {} constants['types_query'] = { 1:('A', 'a host address'), 2:('NS', 'an authoritative name server'), 3:('MD', 'a mail destination (Obsolete - use MX)'), 4:('MF', 'a mail forwarder (Obsolete - use MX)'), 6:('CNAME', 'the canonical name for an alias'), 6:('SOA', 'marks the start of a zone of authority'), 7:('MB', 'a mailbox domain name (EXPERIMENTAL)'), 8:('MG', 'a mail group member (EXPERIMENTAL)'), 9:('MR', 'a mail rename domain name (EXPERIMENTAL)'), 10:('NULL', 'a null RR (EXPERIMENTAL)'), 11:('WKS', 'a well known service description'), 12:('PTR', 'a domain name pointer'), 13:('HINFO', 'host information'), 14:('MINFO', 'mailbox or mail list information'), 15:('MX', 'mail exchange'), 16:('TXT', 'text strings'), 252:('AXFR', 'A request for a transfer of an entire zone'), 253:('MAILB', 'A request for mailbox-related records (MB, MG or MR)'), 254:('MAILA', 'A request for mail agent RRs (Obsolete - see MX)'), 255:('*', 'A request for all records'), } constants['classes_query'] = { 1:('IN', 'the Internet'), 2:('CS', 'the CSNET class (Obsolete - used only for examples in some obsolete RFCs)'), 3:('CH', 'the CHAOS class'), 4:('HS', 'Hesiod [Dyer 87]'), 255:('*', 'any class'), } constants['response_codes'] = { 0:'No error condition', 1:'Format error - The name server was unable to interpret the query.', 2:'Server failure - The name server was unable to process this query due to a problem with the name server.', 3:'Name Error - Meaningful only for responses from an authoritative name server, this code signifies that the domain name referenced in the query does not exist.', 4:'Not Implemented - The name server does not support the requested kind of query.', 5:'Refused - The name server refuses to perform the specified operation for policy reasons. For example, a name server may not wish to provide the information to the particular requester, or a name server may not wish to perform a particular operation (e.g., zone transfer) for particular data.', } class dns_server: def __init__(self, resolveprocedure=None): self.logger = logging.getLogger('dns_server') self.queue = {} self.queue['udp'] = {} self.resolver = resolveprocedure def start_listening_udp(self, dgram_socket): """Start listening on a dgram socket passed as argument.""" self.fd = fd_wrap(dgram_socket.fileno(), self, dgram_socket) def fd_read(self, fd): """Read data from a udp socket.""" data, address = self.fd.file.recvfrom(1073741824) if (address in self.queue['udp']): self.queue['udp'][address] = self.queue['udp'][address] + data else: self.queue['udp'][address] = data self.data_process_dgram(address) def output_send_dgram(self, address, data): """Send data through a udp socket.""" self.socket.sendto(data, address) def output_generate(self, id_string, opcode, authoritativeness=1, recursion_desired=1, response_code=0, questions=[], answers=[], authoritative_nameservers=[], additional_records=[]): """Return a valid binary dns-response generated from the data passed as arguments.""" output_data = {} output_data['purpose_identifier'] = 1 << 15 output_data['opcode'] = opcode << 11 output_data['authoritativeness'] = authoritativeness << 10 output_data['truncatedness'] = 0 << 9 output_data['recursion_desired'] = recursion_desired << 8 output_data['recursion_available'] = 1 << 7 output_data['reserved_bytes'] = 0 << 3 output_data['response_code'] = response_code output_data['counts'] = {} output_data['counts']['questions'] = len(questions) output_data['counts']['answers'] = len(answers) output_data['counts']['authoritative_nameservers'] = len(authoritative_nameservers) output_data['counts']['additional_records'] = len(additional_records) output_data['binarystring'] = output_data['purpose_identifier'] | output_data['opcode'] | output_data['authoritativeness'] | output_data['truncatedness'] | output_data['recursion_desired'] | output_data['recursion_available'] | output_data['reserved_bytes'] | output_data['response_code'] output_binary = {} output_binary['header'] = struct.pack('>2s5H', id_string, output_data['binarystring'], output_data['counts']['questions'], output_data['counts']['answers'], output_data['counts']['authoritative_nameservers'], output_data['counts']['additional_records']) output_binary['questions'] = '' output_binary['resource_records'] = '' rr_defaults = { 'domain_name':None, 'data':None, 'ttl':3600, 'type':1, 'class':1, } for question_unit in questions: if (len(question_unit) != 3): self.logger(35, 'Got passed question unit of invalid length; ignoring. Dump: %s' % (question_unit,)) continue question_binary = self.output_generate_resource_question(domain_name=question_unit[0], query_type=question_unit[1], query_class=question_unit[2]) output_binary['questions'] = output_binary['questions'] + question_binary for rr_dataunit in answers + authoritative_nameservers + additional_records: if ((not (rr_dataunit.has_key('domain_name'))) or (not (rr_dataunit.has_key('data')))): continue rr_data = {} for key in rr_defaults.keys(): if rr_dataunit.has_key(key): rr_data[key] = rr_dataunit[key] else: rr_data[key] = rr_defaults[key] resource_record_binary = self.output_generate_resource_record(domain_name=rr_data['domain_name'], rr_data=rr_data['data'], time_to_live=rr_data['ttl'], rr_type=rr_data['type'], rr_class=rr_data['class']) output_binary['resource_records'] = output_binary['resource_records'] + resource_record_binary return output_binary['header'] + output_binary['questions'] + output_binary['resource_records'] def convert_domain_name(self, domain_name): if (type(domain_name) in (tuple, list)): pass elif (type(domain_name) == str): domain_name = domain_name.split('.') else: raise TypeError('Data passed as domain name (' + str(domain_name) + ') is neither a string, a tuple nor a list.') return domain_name def convert_domain_name_output(self, domain_name): domain_name = self.convert_domain_name(domain_name) domain_packed = '' for sub_domain_name in domain_name: if (len(sub_domain_name) > 255): raise ValueError('Domain-name-fragment ' + sub_domain_name + " is longer than 255 characters. Can't handle this.") domain_packed = domain_packed + chr(len(sub_domain_name)) + sub_domain_name domain_packed = domain_packed + '\000' return domain_packed def output_generate_resource_record(self, domain_name, rr_data, time_to_live=3600, rr_type=1, rr_class=1): """Return a resource-record string generated from data passed as arguments.""" output_binary = {} while ((len(domain_name) > 0) and (domain_name[-1] == '')): del domain_name[-1] output_binary['name'] = self.convert_domain_name_output(domain_name) output_binary['type'] = struct.pack('>H', rr_type) output_binary['class'] = struct.pack('>H', rr_class) output_binary['ttl'] = struct.pack('>l', time_to_live) if (rr_type == 1): #A RDATA output_binary['data'] = socket.inet_aton(rr_data) elif (rr_type == 12): #PTR RDATA output_binary['data'] = self.convert_domain_name_output(rr_data) else: raise NotImplementedError('%s RDATA type not implemented.' % (rr_type)) output_binary['data_length'] = struct.pack('>H', len(output_binary['data'])) return output_binary['name'] + output_binary['type'] + output_binary['class'] + output_binary['ttl'] + output_binary['data_length'] + output_binary['data'] def output_generate_resource_question(self, domain_name, query_type, query_class): return self.convert_domain_name_output(domain_name) + struct.pack('>HH', query_type, query_class) def data_process_dgram(self, address): """Parse incoming requests for dns data.""" request_data = { 'counts':{}, 'id_string':'\000\000', 'ipcode':0, 'questions':[], 'answers':[], 'authoritative_nameservers':[], 'additional_records':[], } try: input_rawdata = self.queue['udp'].pop(address) request_data['id_string'], data_binary, request_data['counts']['questions'], request_data['counts']['answers'], request_data['counts']['authoritative_nameservers'], request_data['counts']['additional_records'] = struct.unpack('>2s5H', input_rawdata[0:12]) #Now the bit-shifting. Somewhat of ugly, but I don't know any better way of doing this. request_data['purpose_identifier'] = (data_binary & 32768) >> 15 request_data['opcode'] = (data_binary & 30720) >> 11 request_data['authoritativeness'] = (data_binary & 1024) >> 10 request_data['truncatedness'] = (data_binary & 512) >> 9 request_data['recursion_desired'] = (data_binary & 256) >> 8 request_data['recursion_available'] = (data_binary & 128) >> 7 request_data['reserved_bytes'] = (data_binary & 120) >> 3 request_data['response_code'] = data_binary & 15 data_extracted_count = {'questions':0, 'answers':0, 'authoritative_nameservers':0, 'additional_records':0} input_rawdata_index = 12 for data_type in ['questions', 'answers', 'authoritative_nameservers', 'additional_records']: while (data_extracted_count[data_type] < request_data['counts'][data_type]): if (data_type == 'questions'): current_data_list = [[],0,''] while (1): label_length = ord(input_rawdata[input_rawdata_index]) if (label_length == 0): break current_data_string = input_rawdata[input_rawdata_index + 1:input_rawdata_index + label_length + 1] current_data_list[0].append(current_data_string) input_rawdata_index = input_rawdata_index + label_length + 1 input_rawdata_index = input_rawdata_index +1 query_type, query_class = struct.unpack('>2H', input_rawdata[input_rawdata_index:input_rawdata_index + 4]) current_data_list[1] = query_type current_data_list[2] = query_class else: break input_rawdata_index = input_rawdata_index + label_length + 2 data_extracted_count[data_type] = data_extracted_count[data_type] + 1 request_data[data_type].append(current_data_list) except struct.error: self.logger.log(40, "Can't process input.", exc_info=True) else: response_data = { 'id_string':request_data['id_string'], 'opcode':request_data['opcode'], 'authoritativeness':1, 'recursion_desired':1, 'response_code':0, 'questions':request_data['questions'], 'answers':[], 'authoritative_nameservers':[], 'additional_records':[], 'send_response':True, 'target_address':address } if (callable(self.resolver)): try: self.resolver(request_data=request_data, response_data=response_data) except StandardError: self.logger.log(40, 'Error while trying to obtain answers to a dns-query.', exc_info=True) if (response_data['send_response']): self.output_send_response(response_data=response_data) def output_send_response(self, response_data): for element in ('target_address', 'id_string', 'opcode', 'authoritativeness', 'recursion_desired', 'response_code', 'questions', 'answers', 'authoritative_nameservers', 'additional_records'): if not (element in response_data): raise ValueError('Missing element %s in response_data.' % (element,)) outputstring = self.output_generate(id_string=response_data['id_string'], opcode=response_data['opcode'], authoritativeness=response_data['authoritativeness'], recursion_desired=response_data['recursion_desired'], response_code=response_data['response_code'], questions=response_data['questions'], answers=response_data['answers'], authoritative_nameservers=response_data['authoritative_nameservers'], additional_records=response_data['additional_records']) self.output_send_dgram(response_data['target_address'], outputstring) def fd_forget(self, fd): if (self.fd == fd): self.fd = None def shutdown(self): self.fd.close() def clean_up(self): self.shutdown()