# Written by Ingar Arntzen, Norut # see LICENSE.txt for license information """ This module implements a UPnP Service. This involves a base class intended for development of new services. The baseclass hides complexity related to producing UPnP Service description in both XML and HTML format. It also hides complexity related to the placement of a service within a device hierarchy. """ import types import uuid import exceptions import Tribler.UPnP.common.upnpmarshal as upnpmarshal class ActionError (exceptions.Exception): """Error associated with invoking actions on a UPnP Server. """ pass ############################################## # XML FMT ############################################## _SERVICE_DESCRIPTION_FMT = """ 1 0 %s %s """ _ACTION_FMT = """ %s %s """ _ARGUMENT_FMT = """ %s %s %s """ _EVENTED_VARIABLE_FMT = """ %s %s %s """ _ARG_VARIABLE_FMT = """ %s %s """ def _service_description_toxml(service): """This function produces the UPnP XML service description.""" svs_str = "" # Evented Variables for evar in service.get_evented_variables(): data_type = upnpmarshal.dumps_data_type(evar.the_type) default_value = upnpmarshal.dumps(evar.default_value) args = (evar.the_name, data_type, default_value) svs_str += _EVENTED_VARIABLE_FMT % args actions_str = "" arg_counter = 0 # One state variable per type (event variables of arguments) unique_variables = {} # type : variable name for evar in service.get_evented_variables(): if not unique_variables.has_key(evar.the_type): unique_variables[evar.the_type] = evar.the_name # Arguments for action in service.get_actions(): args_str = "" for arg in action.in_arg_list + action.out_arg_list: # Check if argument can be related to event variable if unique_variables.has_key(arg.the_type): related_variable_name = unique_variables[arg.the_type] else: arg_counter += 1 related_variable_name = "A_ARG_TYPE_%d" % arg_counter unique_variables[arg.the_type] = related_variable_name # New State Variable data_type = upnpmarshal.dumps_data_type(arg.the_type) svs_str += _ARG_VARIABLE_FMT % (related_variable_name, data_type) # New Argument direction = 'in' if isinstance(arg, _InArg) else 'out' args_str += _ARGUMENT_FMT % (arg.the_name, related_variable_name, direction) # Action actions_str += _ACTION_FMT % (action.name, args_str) return _SERVICE_DESCRIPTION_FMT % (actions_str, svs_str) ############################################## # UPNP SERVICE ############################################## class UPnPService: """ This implements a base class for all UPnP Services. New services should extend this class. The base class hides complexity related to production of XML service descriptions as well as HTTP descriptions. The base class also hides complexity related to placement in the UPnP device hierarchy. """ def __init__(self, service_id, service_type, service_version=1): self.service_manager = None self._actions = {} # actionName : Action self._events = {} # eventName : Event self._subs = {} # callbackURL : Subscriptions # Initialise self.service_type = service_type self.service_version = service_version self.service_id = service_id self.base_url = "" self.description_path = "" self.control_path = "" self.event_path = "" self._logger = None def set_service_manager(self, service_manager): """Initialise UPnP service with reference to service manager.""" self.service_manager = service_manager self.base_url = self.service_manager.get_base_url() self.description_path = "services/%s/description.xml" % self.service_id self.control_path = "services/%s/control" % self.service_id self.event_path = "services/%s/events" % self.service_id # Logging self._logger = self.service_manager.get_logger() def is_valid(self): """Check if service is valid.""" return (self.service_type != None and self.service_id != None and self.base_url != None and self.service_manager != None) def get_short_service_id(self): """Return short service id.""" return self.service_id def get_service_id(self): """Return full service id.""" return "urn:upnp-org:serviceId:%s" % self.service_id def get_service_type(self): """Return service type.""" fmt = "urn:schemas-upnp-org:service:%s:%s" return fmt % (self.service_type, self.service_version) def get_xml_description(self): """Returns xml description of service.""" return _service_description_toxml(self) def close(self): """Close UPnP service safely.""" for sub in self._subs.values(): sub.close() ############################################## # LOG API ############################################## def log(self, msg): """Logger.""" if self._logger: self._logger.log("SERVICE", "%s %s" % (self.service_id, msg)) ############################################## # SUBSCRIBE / NOTIFY API ############################################## def notify(self, evented_variables): """Notify all subscribers of updated event variables.""" self._remove_expired_subscriptions() # Dispatch Event Messages to all subscribers # of the given serviceid. # Make sure all stateVariables are evented variables. for sub in self._subs.values(): sub.notify(evented_variables) def subscribe(self, callback_urls, requested_duration): """Process new subscription request.""" # requested duration == 0 => infinite self._remove_expired_subscriptions() # For the moment, just accept a single callbackUrl # Subscriber defined by callbackUrl callback_url = callback_urls[0] if self._subs.has_key(callback_url): # Subscriber already exists return (None, None) else: # Add new Subscriber sub = _Subscription(self, callback_url, requested_duration) self._subs[callback_url] = sub # Dispatch Initial Event Message sub.initial_notify() return (sub.sid, sub.duration) def renew(self, sid_str, requested_duration): """Request to renew an existing subscription.""" # requested duration == 0 => infinite for sub in self._subs.values(): if str(sub.sid) == sid_str: return sub.renew(requested_duration) else: return None def unsubscribe(self, sid_str): """Request to unsubscribe an existing subscription.""" sub = None for sub in self._subs.values(): if str(sub.sid) == sid_str: break if sub: sub.cancel() del self._subs[sub.callback_url] return True else: return False def _remove_expired_subscriptions(self): """Scans subscriptions and removes invalidated.""" for url, sub in self._subs.items()[:]: if sub.is_expired: del self._subs[url] ############################################## # ACTION API ############################################## def define_action(self, method, in_args=None, out_args=None, name=None): """Define an action that the service implements. Used by subclass.""" if not in_args: in_args = [] if not out_args: out_args = [] if not name: action_name = method.__name__ else: action_name = name # In/Out Args must be tuples of (name, type) in_args = [_InArg(t[0], t[1]) for t in in_args] out_args = [_OutArg(t[0], t[1]) for t in out_args] action = _Action(action_name, method, in_args, out_args) self._actions[action_name] = action def invoke_action(self, action_name, in_args): """Invoke and action that the service implements. Used by httpserver as part of UPnP control interface.""" # in_args is assumed to be tuple of (name, data) all unicode string. try: if not self._actions.has_key(action_name): raise ActionError, "Action Not Supported" else: action = self._actions[action_name] return action.execute(in_args) except ActionError, why: print why def get_actions(self): """Returns all actions that the service implements.""" return self._actions.values() ############################################## # EVENTED VARIABLE API ############################################## def define_evented_variable(self, event_name, the_type, default_value): """Define an evented variable for the service. Used by subclass.""" evar = _EventedVariable(self, event_name, the_type, default_value) self._events[event_name] = evar return evar def get_evented_variable(self, event_name): """Return evented variable given name.""" return self._events.get(event_name, None) def get_evented_variables(self): """Return all evented variables defined by the service.""" return self._events.values() def set_evented_variables(self, list_): """ Update a list of state variables at once. Input will be a list of tuples [(eventedVariable, newValue)] The method avoids sending one notification to every subscriber, for each state variable. Instead, a single subscriber receives one eventMessage containing all the updated state Variables in this list. """ # Update Values changed_variables = [] for evar, new_value in list_: changed = evar.set(new_value, notify_ok=False) if changed: changed_variables.append(evar) # notify all in one batch self.notify(changed_variables) ############################################## # EVENTED VARIABLE ############################################## class _EventedVariable: """This class defines an evented variable. The class hides complexity related to event notification.""" def __init__(self, service, the_name, the_type, default_value): self._service = service self.the_name = the_name if type(the_type) == types.TypeType: self.the_type = the_type else: msg = "Argument 'the_type' is not actually a python type." raise TypeError, msg self._value = default_value self.default_value = default_value def set(self, new_value, notify_ok=True): """Set a new value for the evented variable. If the value is different from the old value, notifications will be generated.""" if type(new_value) != self.the_type: msg = "Argument 'the_type' is not actually a python type." raise TypeError, msg if new_value != self._value: # Update Value self._value = new_value # Notify if notify_ok: self._service.notify([self]) return True else : return False def get(self): """Get the value of an evented variable.""" return self._value ############################################## # ARGUMENT ############################################## class _Argument : """The class defines an argument by holding a type and and argument name.""" def __init__(self, the_name, the_type): self.the_name = the_name self.the_type = the_type class _InArg(_Argument): """The class defines an input argument by holding a type and and argument name.""" pass class _OutArg(_Argument): """The class defines an output argument (result value) by holding a type and and argument name.""" pass ############################################## # ACTION ############################################## class _Action: """This class represents an action implemented by the service.""" def __init__(self, name, method, in_arg_list, out_arg_list): self.name = name self.method = method self.in_arg_list = in_arg_list self.out_arg_list = out_arg_list def execute(self, in_args): """Execute the action.""" # in_args is assumed to be tuple of (name, data) all unicode string. # the tuple is supposed to be ordered according to in_arg_list if len(in_args) != len(self.in_arg_list): raise ActionError, "Wrong number of input arguments" typed_args = [] for i in range(len(in_args)): name, data = in_args[i] in_arg = self.in_arg_list[i] if name != in_arg.the_name: raise ActionError, "Wrong name/order for input argument" try: value = upnpmarshal.loads(in_arg.the_type, data) except upnpmarshal.MarshalError, why: raise ActionError, why typed_args.append(value) # Execute try: result = self.method(*typed_args) except TypeError, why: raise ActionError, "Method Execution Failed (%s)" % why # Result is eiter a single value (incl. None) or a tuple of values. # Make it into a list in both cases. if result == None: result = [] elif result == types.TupleType: result = list(result) else: result = [result] # Check that result holds the correct number of values if len(result) != len(self.out_arg_list): raise ActionError, "Wrong number of Results" # Check that each value has the correct type # Also convert python type objects to string representations. # Construct out_args list of tuples [(name, data), ...] out_args = [] for i in range(len(result)): out_arg = self.out_arg_list[i] value = result[i] if not isinstance(value, out_arg.the_type): raise ActionError, "Result is wrong type." else: try: data = upnpmarshal.dumps(value) except upnpmarshal.MarshalError, why: raise ActionError, why out_args.append((out_arg.the_name, data)) return out_args ############################################## # SUBSCRIPTION ############################################## class NotifyError (exceptions.Exception): """Error associated with event notification.""" pass class _Subscription: """This class represents a subscription made to the service, for notification whenever one of its evented variables is updated.""" def __init__(self, service, callback_url, requested_duration): # requested_duration == 0 implies INFINITE # requested_duration > 0 implies FINITE # requested_duration < 0 not legal self.service = service self.sid = uuid.uuid1() self.event_key = 0 self.callback_url = callback_url self.duration = 1800 # ignore requested_duration self.is_expired = False def notify(self, evented_variables): """Notify this subscriber that given evented variables have been updated.""" if self.is_expired : return False # should not be neccesary else: self.event_key += 1 # Construct list of tuples [(name, value), ...] variables = [] for evar in evented_variables: try: data = upnpmarshal.dumps(evar.get()) except upnpmarshal.MarshalError, why: raise NotifyError, why variables.append((evar.the_name, data)) # Dispatch Notification edp = self.service.service_manager.get_event_dispatcher() edp.dispatch(self.sid, self.event_key, self.callback_url, variables) return True def initial_notify(self): """Notify this subscriber of all evented state variables and their values""" if self.is_expired: return False # Event Key must be 0 if self.event_key != 0: return False # All Evented Variables evented_variables = self.service.get_evented_variables() variables = [] for evar in evented_variables: try: data = upnpmarshal.dumps(evar.get()) except upnpmarshal.MarshalError, why: raise NotifyError, why variables.append((evar.the_name, data)) # Dispatch Notification edp = self.service.service_manager.get_event_dispatcher() edp.dispatch(self.sid, 0, self.callback_url, variables) return True def renew(self, requested_duration): """Renew subscription for this subscriber.""" self.duration = requested_duration self.is_expired = False return self.duration def cancel(self): """Cancel subscription for this subscriber.""" self.is_expired = True return True def close(self): """Close this subscription safely.""" pass ############################################## # MAIN ############################################## if __name__ == '__main__': class MockEventDispatcher: """Mock Event Dispatcher.""" def __init__(self): pass def dispatch(self, sid, event_key, callback_url, variables): """Mock method.""" print "Notify", sid, event_key, callback_url, variables class MockServiceManager: """Mock Service Manager.""" def __init__(self): self._ed = MockEventDispatcher() def get_event_dispatcher(self): """Mock method.""" return self._ed def get_base_url(self): """Mock method.""" return "http://myhost:44444" def get_logger(self): """Mock method.""" return None SM = MockServiceManager() from Tribler.UPnP.services import SwitchPower SERVICE = SwitchPower('SwitchPower') SERVICE.set_service_manager(SM) print SERVICE.get_xml_description()