from __future__ import absolute_import, print_function import json as json_parser import logging import urllib import websocket import dockercloud from .exceptions import ApiError, AuthError from .http import send_request logger = logging.getLogger("python-dockercloud") class BasicObject(object): _api_version = 'v1' def __init__(self, **kwargs): pass class Restful(BasicObject): _detail_uri = None namespaced = True def __init__(self, **kwargs): """Simply reflect all the values in kwargs""" for k, v in list(kwargs.items()): setattr(self, k, v) def __addchanges__(self, name): changed_attrs = self.__getchanges__() if not name in changed_attrs: changed_attrs.append(name) self.__setchanges__(changed_attrs) def __setattr__(self, name, value): """Keeps track of what attributes have been set""" current_value = getattr(self, name, None) if value != current_value: self.__addchanges__(name) super(Restful, self).__setattr__(name, value) def __getchanges__(self): """Internal. Convenience method to get the changed attrs list""" return getattr(self, '__changedattrs__', []) def __setchanges__(self, val): """Internal. Convenience method to set the changed attrs list""" # Use the super implementation to prevent infinite recursion super(Restful, self).__setattr__('__changedattrs__', val) def _loaddict(self, dict): """Internal. Sets the model attributes to the dictionary values passed""" endpoint = getattr(self, 'endpoint', None) subsystem = getattr(self, 'subsystem', None) assert endpoint, "Endpoint not specified for %s" % self.__class__.__name__ assert subsystem, "Subsystem not specified for %s" % self.__class__.__name__ for k, v in list(dict.items()): setattr(self, k, v) if self.namespaced and dockercloud.namespace: self._detail_uri = "/".join(["api", subsystem, self._api_version, dockercloud.namespace, endpoint.strip("/"), self.pk]) else: self._detail_uri = "/".join(["api", subsystem, self._api_version, endpoint.strip("/"), self.pk]) self.__setchanges__([]) @property def pk(self): """Returns the primary key for the object. :returns: str -- the primary key for the object """ return getattr(self, self._pk_key(), None) @classmethod def _pk_key(cls): """Internal. Returns the attribute name that acts as primary key of the model. Can be overridden by subclasses. :returns: str -- the name of the primary key attribute for the model """ return 'uuid' @property def is_dirty(self): """Returns whether or not the object has unsaved changes :returns: bool -- whether or not the object has unsaved changes """ return len(self.__getchanges__()) > 0 def _perform_action(self, action, params=None, data={}): """Internal. Performs the specified action on the object remotely""" success = False if not self._detail_uri: raise ApiError("You must save the object before performing this operation") path = "/".join([self._detail_uri.rstrip("/"), action.lstrip("/")]) json = send_request("POST", path, params=params, data=data) if json: self._loaddict(json) success = True return success def _expand_attribute(self, attribute): """Internal. Expands the given attribute from remote information""" if not self._detail_uri: raise ApiError("You must save the object before performing this operation") path = "/".join([self._detail_uri, attribute]) json = send_request("GET", path) if json: return json[attribute] return None def get_all_attributes(self): """Returns a dict with all object attributes :returns: dict -- all object attributes as a dict """ attributes = {} for attr in [attr for attr in vars(self) if not attr.startswith('_')]: attributes[attr] = getattr(self, attr, None) return attributes class Immutable(Restful): @classmethod def fetch(cls, pk): instance = None endpoint = getattr(cls, 'endpoint', None) subsystem = getattr(cls, 'subsystem', None) assert endpoint, "Endpoint not specified for %s" % cls.__name__ assert subsystem, "Subsystem not specified for %s" % cls.__name__ if cls.namespaced and dockercloud.namespace: detail_uri = "/".join(["api", subsystem, cls._api_version, dockercloud.namespace, endpoint.strip("/"), pk]) else: detail_uri = "/".join(["api", subsystem, cls._api_version, endpoint.strip("/"), pk]) json = send_request('GET', detail_uri) if json: instance = cls() instance._loaddict(json) return instance @classmethod def list(cls, limit=None, **kwargs): restful = [] endpoint = getattr(cls, 'endpoint', None) subsystem = getattr(cls, 'subsystem', None) assert endpoint, "Endpoint not specified for %s" % cls.__name__ assert subsystem, "Subsystem not specified for %s" % cls.__name__ if cls.namespaced and dockercloud.namespace: detail_uri = "/".join(["api", subsystem, cls._api_version, dockercloud.namespace, endpoint.strip("/")]) else: detail_uri = "/".join(["api", subsystem, cls._api_version, endpoint.strip("/")]) objects = [] while True: if limit and len(objects) >= limit: break json = send_request('GET', detail_uri, params=kwargs) objs = json.get('objects', []) meta = json.get('meta', {}) next_url = meta.get('next', '') offset = meta.get('offset', 0) api_limit = meta.get('limit', 0) objects.extend(objs) if next_url: kwargs['offset'] = offset + api_limit kwargs['limit'] = api_limit else: break if limit: objects = objects[:limit] for obj in objects: instance = cls() instance._loaddict(obj) restful.append(instance) return restful def refresh(self, force=False): success = False if self.is_dirty and not force: # We have local non-committed changes - rejecting the refresh success = False elif not self._detail_uri: raise ApiError("You must save the object before performing this operation") else: json = send_request("GET", self._detail_uri) if json: self._loaddict(json) success = True return success class Mutable(Immutable): @classmethod def create(cls, **kwargs): """Returns a new instance of the model (without saving it) with the attributes specified in ``kwargs`` :returns: RESTModel -- a new local instance of the model """ return cls(**kwargs) def delete(self): if not self._detail_uri: raise ApiError("You must save the object before performing this operation") action = "DELETE" url = self._detail_uri json = send_request(action, url) if json: self._loaddict(json) else: # Object deleted successfully and nothing came back - deleting PK reference. self._detail_uri = None # setattr(self, self._pk_key(), None) -- doesn't work self.__setchanges__([]) return True def save(self): success = False if not self.is_dirty: # No changes success = True else: cls = self.__class__ endpoint = getattr(cls, 'endpoint', None) subsystem = getattr(cls, 'subsystem', None) assert endpoint, "Endpoint not specified for %s" % self.__class__.__name__ assert subsystem, "Subsystem not specified for %s" % self.__class__.__name__ # Figure out whether we should do a create or update if not self._detail_uri: action = "POST" if cls.namespaced and dockercloud.namespace: path = "/".join(["api", subsystem, self._api_version, dockercloud.namespace, endpoint.lstrip("/")]) else: path = "/".join(["api", subsystem, self._api_version, endpoint.lstrip("/")]) else: action = "PATCH" path = self._detail_uri # Construct the necessary params params = {} for attr in self.__getchanges__(): value = getattr(self, attr, None) params[attr] = value # Construct the json body payload = None if params: payload = json_parser.dumps(params) if not payload: payload = json_parser.dumps({}) # Make the request success = False json = send_request(action, path, data=payload) if json: self._loaddict(json) success = True return success class Taggable(BasicObject): pass class Triggerable(BasicObject): pass class StreamingAPI(BasicObject): def __init__(self, url): self.url = url user_agent = 'python-dockercloud/%s' % dockercloud.__version__ if dockercloud.user_agent: user_agent = "%s %s" % (dockercloud.user_agent, user_agent) header = {'User-Agent': user_agent} header.update(dockercloud.auth.get_auth_header()) self.header = [": ".join([key, value]) for key, value in header.items()] logger.info("Websocket: %s %s" % (self.url, self.header)) self.open_handler = None self.message_handler = None self.error_handler = None self.close_handler = None self.auth_error = False def _on_open(self, ws): if self.open_handler: self.open_handler() def _on_message(self, ws, message): if self.message_handler: self.message_handler(message) def _on_error(self, ws, error): if self.error_handler: self.error_handler(error) def _on_close(self, ws): if self.close_handler: self.close_handler() def on_open(self, handler): self.open_handler = handler def on_message(self, handler): self.message_handler = handler def on_error(self, handler): self.error_handler = handler def on_close(self, handler): self.close_handler = handler def run_forever(self, *args, **kwargs): while True: if getattr(self, "auth_error", False): raise AuthError("Not authorized") ws = websocket.WebSocketApp(self.url, header=self.header, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close) ws.run_forever(ping_interval=10, ping_timeout=5, *args, **kwargs) class StreamingLog(StreamingAPI): def __init__(self, subsystem, resource, uuid, tail, follow): endpoint = "%s/%s/logs/?follow=%s" % (resource, uuid, str(follow).lower()) if tail: endpoint = "%s&tail=%d" % (endpoint, tail) if dockercloud.namespace: url = "/".join([dockercloud.stream_host.rstrip("/"), "api", subsystem, self._api_version, dockercloud.namespace, endpoint.lstrip("/")]) else: url = "/".join([dockercloud.stream_host.rstrip("/"), "api", subsystem, self._api_version, endpoint.lstrip("/")]) super(self.__class__, self).__init__(url) @staticmethod def default_log_handler(message): print(message) def run_forever(self, *args, **kwargs): ws = websocket.WebSocketApp(self.url, header=self.header, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close) ws.run_forever(ping_interval=10, ping_timeout=5, *args, **kwargs) class Exec(StreamingAPI): def __init__(self, uuid, cmd='sh'): endpoint = "container/%s/exec/?command=%s" % (uuid, urllib.quote_plus(cmd)) if dockercloud.namespace: url = "/".join([dockercloud.stream_host.rstrip("/"), "api", "app", self._api_version, dockercloud.namespace, endpoint.lstrip("/")]) else: url = "/".join([dockercloud.stream_host.rstrip("/"), "api", "app", self._api_version, endpoint.lstrip("/")]) super(self.__class__, self).__init__(url) @staticmethod def default_message_handler(message): print(message) def run_forever(self, *args, **kwargs): ws = websocket.WebSocketApp(self.url, header=self.header, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close) ws.run_forever(ping_interval=10, ping_timeout=5, *args, **kwargs)