Compare commits

..

No commits in common. "staging" and "master" have entirely different histories.

27 changed files with 204 additions and 248 deletions

View file

@ -6,7 +6,7 @@ from .base import Immutable, StreamingLog
class Action(Immutable):
subsystem = 'audit'
endpoint = "/action"
is_namespaced = False
namespaced = False
@classmethod
def _pk_key(cls):

View file

@ -12,7 +12,6 @@ from .http import send_request
HUB_INDEX = "https://index.docker.io/v1/"
def authenticate(username, password):
verify_credential(username, password)
dockercloud.basic_auth = base64.b64encode("%s:%s" % (username, password))
@ -56,8 +55,7 @@ def load_from_file(f="~/.docker/config.json"):
p = subprocess.Popen([cmd, 'get'], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
out = p.communicate(input=HUB_INDEX)[0]
except:
raise dockercloud.AuthError(
'error getting credentials - err: exec: "%s": executable file not found in $PATH, out: ``' % cmd)
raise dockercloud.AuthError('error getting credentials - err: exec: "%s": executable file not found in $PATH, out: ``' % cmd)
try:
credential = json.loads(out)

View file

@ -16,19 +16,18 @@ logger = logging.getLogger("python-dockercloud")
class BasicObject(object):
_api_version = 'v1'
def __init__(self, **kwargs):
pass
class Restful(BasicObject):
is_namespaced = True
_detail_uri = None
namespaced = True
def __init__(self, namespace="", **kwargs):
def __init__(self, **kwargs):
"""Simply reflect all the values in kwargs"""
for k, v in list(kwargs.items()):
setattr(self, k, v)
if self.is_namespaced and namespace:
self._namespace = namespace
else:
self._namespace = dockercloud.namespace
self._resource_uri = ""
def __addchanges__(self, name):
changed_attrs = self.__getchanges__()
@ -39,7 +38,7 @@ class Restful(BasicObject):
def __setattr__(self, name, value):
"""Keeps track of what attributes have been set"""
current_value = getattr(self, name, None)
if value != current_value and not name.startswith("_"):
if value != current_value:
self.__addchanges__(name)
super(Restful, self).__setattr__(name, value)
@ -54,10 +53,17 @@ class Restful(BasicObject):
def _loaddict(self, dict):
"""Internal. Sets the model attributes to the dictionary values passed"""
endpoint = getattr(self, 'endpoint', None)
subsystem = getattr(self, 'subsystem', None)
assert endpoint, "Endpoint not specified for %s" % self.__class__.__name__
assert subsystem, "Subsystem not specified for %s" % self.__class__.__name__
for k, v in list(dict.items()):
setattr(self, k, v)
self._resource_uri = getattr(self, "resource_uri", None)
if self.namespaced and dockercloud.namespace:
self._detail_uri = "/".join(["api", subsystem, self._api_version, dockercloud.namespace,
endpoint.strip("/"), self.pk])
else:
self._detail_uri = "/".join(["api", subsystem, self._api_version, endpoint.strip("/"), self.pk])
self.__setchanges__([])
@property
@ -87,9 +93,9 @@ class Restful(BasicObject):
def _perform_action(self, action, params=None, data={}):
"""Internal. Performs the specified action on the object remotely"""
success = False
if not self._resource_uri:
if not self._detail_uri:
raise ApiError("You must save the object before performing this operation")
path = "/".join([self._resource_uri.rstrip("/"), action.lstrip("/")])
path = "/".join([self._detail_uri.rstrip("/"), action.lstrip("/")])
json = send_request("POST", path, params=params, data=data)
if json:
self._loaddict(json)
@ -98,9 +104,9 @@ class Restful(BasicObject):
def _expand_attribute(self, attribute):
"""Internal. Expands the given attribute from remote information"""
if not self._resource_uri:
if not self._detail_uri:
raise ApiError("You must save the object before performing this operation")
path = "/".join([self._resource_uri, attribute])
path = "/".join([self._detail_uri, attribute])
json = send_request("GET", path)
if json:
return json[attribute]
@ -119,43 +125,39 @@ class Restful(BasicObject):
class Immutable(Restful):
@classmethod
def fetch(cls, pk, namespace=""):
def fetch(cls, pk):
instance = None
endpoint = getattr(cls, 'endpoint', None)
subsystem = getattr(cls, 'subsystem', None)
assert endpoint, "Endpoint not specified for %s" % cls.__name__
assert subsystem, "Subsystem not specified for %s" % cls.__name__
if not namespace:
namespace = dockercloud.namespace
if cls.is_namespaced and namespace:
resource_uri = "/".join(["api", subsystem, cls._api_version, namespace, endpoint.strip("/"), pk])
if cls.namespaced and dockercloud.namespace:
detail_uri = "/".join(["api", subsystem, cls._api_version, dockercloud.namespace, endpoint.strip("/"), pk])
else:
resource_uri = "/".join(["api", subsystem, cls._api_version, endpoint.strip("/"), pk])
json = send_request('GET', resource_uri)
detail_uri = "/".join(["api", subsystem, cls._api_version, endpoint.strip("/"), pk])
json = send_request('GET', detail_uri)
if json:
instance = cls()
instance._loaddict(json)
return instance
@classmethod
def list(cls, limit=None, namespace="", **kwargs):
def list(cls, limit=None, **kwargs):
restful = []
endpoint = getattr(cls, 'endpoint', None)
subsystem = getattr(cls, 'subsystem', None)
assert endpoint, "Endpoint not specified for %s" % cls.__name__
assert subsystem, "Subsystem not specified for %s" % cls.__name__
if not namespace:
namespace = dockercloud.namespace
if cls.is_namespaced and namespace:
resource_uri = "/".join(["api", subsystem, cls._api_version, namespace, endpoint.strip("/")])
if cls.namespaced and dockercloud.namespace:
detail_uri = "/".join(["api", subsystem, cls._api_version, dockercloud.namespace, endpoint.strip("/")])
else:
resource_uri = "/".join(["api", subsystem, cls._api_version, endpoint.strip("/")])
detail_uri = "/".join(["api", subsystem, cls._api_version, endpoint.strip("/")])
objects = []
while True:
if limit and len(objects) >= limit:
break
json = send_request('GET', resource_uri, params=kwargs)
json = send_request('GET', detail_uri, params=kwargs)
objs = json.get('objects', [])
meta = json.get('meta', {})
next_url = meta.get('next', '')
@ -180,10 +182,10 @@ class Immutable(Restful):
if self.is_dirty and not force:
# We have local non-committed changes - rejecting the refresh
success = False
elif not self._resource_uri:
elif not self._detail_uri:
raise ApiError("You must save the object before performing this operation")
else:
json = send_request("GET", self._resource_uri)
json = send_request("GET", self._detail_uri)
if json:
self._loaddict(json)
success = True
@ -200,17 +202,16 @@ class Mutable(Immutable):
return cls(**kwargs)
def delete(self):
if not self._resource_uri:
if not self._detail_uri:
raise ApiError("You must save the object before performing this operation")
action = "DELETE"
url = self._resource_uri
url = self._detail_uri
json = send_request(action, url)
if json:
self._loaddict(json)
self._resource_uri = None
else:
# Object deleted successfully and nothing came back - deleting PK reference.
self._resource_uri = None
self._detail_uri = None
# setattr(self, self._pk_key(), None) -- doesn't work
self.__setchanges__([])
return True
@ -227,15 +228,15 @@ class Mutable(Immutable):
assert endpoint, "Endpoint not specified for %s" % self.__class__.__name__
assert subsystem, "Subsystem not specified for %s" % self.__class__.__name__
# Figure out whether we should do a create or update
if not self._resource_uri:
if not self._detail_uri:
action = "POST"
if cls.is_namespaced and self._namespace:
path = "/".join(["api", subsystem, self._api_version, self._namespace, endpoint.lstrip("/")])
if cls.namespaced and dockercloud.namespace:
path = "/".join(["api", subsystem, self._api_version, dockercloud.namespace, endpoint.lstrip("/")])
else:
path = "/".join(["api", subsystem, self._api_version, endpoint.lstrip("/")])
else:
action = "PATCH"
path = self._resource_uri
path = self._detail_uri
# Construct the necessary params
params = {}
for attr in self.__getchanges__():
@ -321,16 +322,13 @@ class StreamingAPI(BasicObject):
class StreamingLog(StreamingAPI):
def __init__(self, subsystem, resource, uuid, tail, follow, namespace=""):
def __init__(self, subsystem, resource, uuid, tail, follow):
endpoint = "%s/%s/logs/?follow=%s" % (resource, uuid, str(follow).lower())
if tail:
endpoint = "%s&tail=%d" % (endpoint, tail)
if not namespace:
namespace = dockercloud.namespace
if namespace:
if dockercloud.namespace:
url = "/".join([dockercloud.stream_host.rstrip("/"), "api", subsystem, self._api_version,
self._namespace, endpoint.lstrip("/")])
dockercloud.namespace, endpoint.lstrip("/")])
else:
url = "/".join([dockercloud.stream_host.rstrip("/"), "api", subsystem, self._api_version,
endpoint.lstrip("/")])
@ -350,13 +348,11 @@ class StreamingLog(StreamingAPI):
class Exec(StreamingAPI):
def __init__(self, uuid, cmd='sh', namespace=""):
def __init__(self, uuid, cmd='sh'):
endpoint = "container/%s/exec/?command=%s" % (uuid, urllib.quote_plus(cmd))
if not namespace:
namespace = dockercloud.namespace
if namespace:
if dockercloud.namespace:
url = "/".join([dockercloud.stream_host.rstrip("/"), "api", "app", self._api_version,
namespace, endpoint.lstrip("/")])
dockercloud.namespace, endpoint.lstrip("/")])
else:
url = "/".join([dockercloud.stream_host.rstrip("/"), "api", "app", self._api_version, endpoint.lstrip("/")])
super(self.__class__, self).__init__(url)

View file

@ -13,14 +13,11 @@ logger = logging.getLogger("python-dockercloud")
class Events(StreamingAPI):
def __init__(self, namespace=""):
def __init__(self):
endpoint = "events"
if not namespace:
namespace = dockercloud.namespace
if namespace:
if dockercloud.namespace:
url = "/".join([dockercloud.stream_host.rstrip("/"), "api", "audit", self._api_version,
namespace, endpoint.lstrip("/")])
dockercloud.namespace, endpoint.lstrip("/")])
else:
url = "/".join([dockercloud.stream_host.rstrip("/"), "api", "audit", self._api_version,
endpoint.lstrip("/")])

View file

@ -8,7 +8,7 @@ class Node(Mutable, Taggable):
endpoint = "/node"
def save(self):
if not self.resource_uri:
if not self._detail_uri:
raise AttributeError("Adding a new node is not supported via 'save' method")
super(Node, self).save()

View file

@ -6,7 +6,7 @@ from .base import Immutable
class AZ(Immutable):
subsystem = "infra"
endpoint = "/az"
is_namespaced = False
namespaced = False
@classmethod
def _pk_key(cls):

View file

@ -16,7 +16,7 @@ class NodeCluster(Mutable, Taggable):
def create(cls, **kwargs):
for key, value in kwargs.items():
if key == "node_type" and isinstance(value, NodeType):
kwargs[key] = getattr(value, "_resource_uri", "")
kwargs[key] = getattr(value, "resource_uri", "")
if key == "region" and isinstance(value, Region):
kwargs[key] = getattr(value, "_resource_uri", "")
kwargs[key] = getattr(value, "resource_uri", "")
return cls(**kwargs)

View file

@ -6,7 +6,7 @@ from .base import Immutable
class Provider(Immutable):
subsystem = "infra"
endpoint = "/provider"
is_namespaced = False
namespaced = False
@classmethod
def _pk_key(cls):

View file

@ -6,7 +6,7 @@ from .base import Immutable
class Region(Immutable):
subsystem = "infra"
endpoint = "/region"
is_namespaced = False
namespaced = False
@classmethod
def _pk_key(cls):

View file

@ -6,7 +6,7 @@ from .base import Immutable
class NodeType(Immutable):
subsystem = "infra"
endpoint = "/nodetype"
is_namespaced = False
namespaced = False
@classmethod
def _pk_key(cls):

View file

@ -20,7 +20,7 @@ class Stack(Mutable):
return self._perform_action("redeploy", params=params)
def export(self):
if not self.resource_uri:
if not self._detail_uri:
raise ApiError("You must save the object before performing this operation")
url = "/".join([self.resource_uri, "export"])
url = "/".join([self._detail_uri, "export"])
return send_request("GET", url, inject_header=False)

View file

@ -57,7 +57,7 @@ class Tag(BasicObject):
def fetch(cls, taggable):
if not isinstance(taggable, Taggable):
raise ApiError("The object does not support tag")
if not taggable.resource_uri:
if not taggable._detail_uri:
raise ApiError("You must save the taggable object before performing this operation")
tag = cls()

View file

@ -8,7 +8,6 @@ from .http import send_request
class Trigger(BasicObject):
def __init__(self):
self.trigger = None
self.resource_uri = None
def add(self, name=None, operation=None):
@ -31,11 +30,11 @@ class Trigger(BasicObject):
return cls(**kwargs)
def delete(self, uuid):
if not self.resource_uri:
if not self.endpoint:
raise ApiError("You must initialize the Trigger object before performing this operation")
action = "DELETE"
url = "/".join([self.resource_uri, uuid])
url = "/".join([self.endpoint, uuid])
send_request(action, url)
return True
@ -44,11 +43,11 @@ class Trigger(BasicObject):
if not isinstance(triggerable, Triggerable):
raise ApiError("The object does not support trigger")
if not triggerable.resource_uri:
if not triggerable._detail_uri:
raise ApiError("You must save the triggerable object before performing this operation")
trigger = cls()
trigger.resource_uri = "/".join([triggerable.resource_uri, "trigger"])
trigger.endpoint = "/".join([triggerable._detail_uri, "trigger"])
handlers = []
for t in trigger.list():
triggername = t.get("name", "")
@ -57,12 +56,12 @@ class Trigger(BasicObject):
return trigger
def list(self, **kwargs):
if not self.resource_uri:
if not self.endpoint:
raise ApiError("You must initialize the Trigger object before performing this operation")
objects = []
while True:
json = send_request('GET', self.resource_uri, params=kwargs)
json = send_request('GET', self.endpoint, params=kwargs)
objs = json.get('objects', [])
meta = json.get('meta', {})
next_url = meta.get('next', '')
@ -78,23 +77,23 @@ class Trigger(BasicObject):
return objects
def save(self):
if not self.resource_uri:
if not self.endpoint:
raise ApiError("You must initialize the Trigger object before performing this operation")
if self.trigger is None:
return True
json = send_request("POST", self.resource_uri, data=json_parser.dumps(self.trigger))
json = send_request("POST", self.endpoint, data=json_parser.dumps(self.trigger))
if json:
self.clear()
self.clear()
return True
def call(self, uuid):
if not self.resource_uri:
if not self.endpoint:
raise ApiError("You must initialize the Trigger object before performing this operation")
json = send_request("POST", "/".join([self.resource_uri, uuid + "/call"]))
json = send_request("POST", "/".join([self.endpoint, uuid + "/call"]))
if json:
return True
return False

View file

@ -43,7 +43,7 @@ class Utils:
return Action.fetch(id)
else:
raise ApiError(
"Unsupported resource type. Only support: action, container, node, nodecluster, service, stack")
"Unsupported resource type. Only support: action, container, node, nodecluster, service, stack")
@staticmethod
def fetch_remote_container(identifier, raise_exceptions=True):
@ -162,7 +162,7 @@ class Utils:
elif len(objects_same_identifier) == 0:
raise ObjectNotFound("Cannot find a node cluster with the identifier '%s'" % identifier)
raise NonUniqueIdentifier(
"More than one node cluster has the same identifier, please use the long uuid")
"More than one node cluster has the same identifier, please use the long uuid")
except (NonUniqueIdentifier, ObjectNotFound) as e:
if not raise_exceptions:
@ -185,7 +185,7 @@ class Utils:
elif len(objects_same_identifier) == 0:
raise ObjectNotFound("Cannot find an action cluster with the identifier '%s'" % identifier)
raise NonUniqueIdentifier(
"More than one action has the same identifier, please use the long uuid")
"More than one action has the same identifier, please use the long uuid")
except (NonUniqueIdentifier, ObjectNotFound) as e:
if not raise_exceptions:

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -4,10 +4,7 @@ import os
import tempfile
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *
@ -78,6 +75,7 @@ class AuthTestCase(unittest.TestCase):
dockercloud.basic_auth = FAKE_BASIC_AUTH
self.assertEqual({'Authorization': FAKE_DOCKERCLOUD_AUTH}, dockercloud.auth.get_auth_header())
print "===================="
dockercloud.dockercloud_auth = None
dockercloud.basic_auth = FAKE_BASIC_AUTH
self.assertEqual({'Authorization': 'Basic %s' % (FAKE_BASIC_AUTH)}, dockercloud.auth.get_auth_header())

View file

@ -3,10 +3,7 @@ from __future__ import absolute_import
import json
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from dockercloud.api.base import Restful, Mutable, Immutable
@ -57,12 +54,14 @@ class RestfulTestCase(unittest.TestCase):
def test_restful_loaddict(self):
model = Restful()
model.endpoint = 'resource_uri'
self.assertRaises(AssertionError, model._loaddict, {'key': 'value'})
model.endpoint = 'endpoint'
model.subsystem = "subsystem"
resource_uri = "/".join(["api", model.subsystem, model._api_version, model.endpoint.lstrip("/"), model.pk])
model._loaddict({'key': 'value', "resource_uri": resource_uri})
model._loaddict({'key': 'value'})
self.assertEqual('value', model.key)
self.assertEqual(resource_uri, model._resource_uri)
self.assertEqual("/".join(["api", model.subsystem, model._api_version, model.endpoint.lstrip("/"), model.pk]),
model._detail_uri)
self.assertEqual([], model.__getchanges__())
def test_restful_pk(self):
@ -78,29 +77,32 @@ class RestfulTestCase(unittest.TestCase):
@mock.patch('dockercloud.api.base.send_request')
def test_restful_perform_action(self, mock_send_request):
try:
model = Restful()
self.assertRaises(dockercloud.ApiError, model._perform_action, 'action')
model = Restful()
self.assertRaises(dockercloud.ApiError, model._perform_action, 'action')
model.endpoint = 'fake'
model.subsystem = "subsystem"
model._detail_uri = "/".join(
["api", model.subsystem, model._api_version, model.endpoint.lstrip("/"), model.pk])
mock_send_request.side_effect = [{'key': 'value'}, None]
self.assertTrue(model._perform_action('action', params={'k': 'v'}, data={'key': 'value'}))
self.assertEqual('value', model.key)
mock_send_request.assert_called_with('POST', "/".join([model._detail_uri, "action"]), data={'key': 'value'},
params={'k': 'v'})
model.endpoint = 'fake'
model.subsystem = "subsystem"
model.resource_uri = "/".join(
["api", model.subsystem, model._api_version, model.endpoint.lstrip("/"), model.pk])
model._resource_uri = model.resource_uri
mock_send_request.side_effect = [{'key': 'value'}, None]
self.assertTrue(model._perform_action('action', params={'k': 'v'}, data={'key': 'value'}))
self.assertEqual('value', model.key)
mock_send_request.assert_called_with('POST', "/".join([model._resource_uri, "action"]), data={'key': 'value'},
params={'k': 'v'})
self.assertFalse(model._perform_action('action', {'key': 'value'}))
self.assertFalse(model._perform_action('action', {'key': 'value'}))
finally:
if hasattr(Restful, 'endpoint'):
delattr(Restful, 'endpoint')
@mock.patch('dockercloud.api.base.send_request')
def test_restful_expand_attribute(self, mock_send_request):
model = Restful()
self.assertRaises(dockercloud.ApiError, model._expand_attribute, 'attribute')
model._resource_uri = 'fake/uuid'
model._detail_uri = 'fake/uuid'
mock_send_request.side_effect = [{'key': 'value'}, None]
self.assertEqual('value', model._expand_attribute('key'))
@ -122,65 +124,63 @@ class ImmutableTestCase(unittest.TestCase):
@mock.patch('dockercloud.api.base.send_request')
def test_immutable_fetch(self, mock_send_request):
try:
delattr(Immutable, 'endpoint')
delattr(Immutable, 'subsystem')
except:
pass
self.assertRaises(AssertionError, Immutable.fetch, 'uuid')
ret_json = {"key": "value"}
mock_send_request.return_value = ret_json
Immutable.endpoint = 'resource_uri'
Immutable.subsystem = "subsystem"
model = Immutable.fetch('uuid')
mock_send_request.assert_called_with('GET', 'api/subsystem/%s/resource_uri/uuid' % Immutable._api_version)
self.assertIsInstance(model, Immutable)
self.assertEqual('value', model.key)
try:
ret_json = {"key": "value"}
mock_send_request.return_value = ret_json
Immutable.endpoint = 'endpoint'
Immutable.subsystem = "subsystem"
model = Immutable.fetch('uuid')
mock_send_request.assert_called_with('GET', 'api/subsystem/%s/endpoint/uuid' % Immutable._api_version)
self.assertIsInstance(model, Immutable)
self.assertEqual('value', model.key)
finally:
if hasattr(Immutable, 'endpoint'):
delattr(Immutable, 'endpoint')
@mock.patch('dockercloud.api.base.send_request')
def test_immutable_list(self, mock_send_request):
try:
delattr(Immutable, 'endpoint')
delattr(Immutable, 'subsystem')
except:
pass
self.assertRaises(AssertionError, Immutable.list)
kwargs = {'key': 'value'}
ret_json = {"meta": {"limit": 25, "next": None, "offset": 0, "previous": None, "total_count": 1},
"objects": [{"key": "value1"}, {"key": "value2"}]}
mock_send_request.return_value = ret_json
Immutable.endpoint = 'fake'
Immutable.subsystem = "subsystem"
models = Immutable.list(**kwargs)
mock_send_request.assert_called_with('GET', 'api/subsystem/%s/fake' % Immutable._api_version, params=kwargs)
self.assertEqual(2, len(models))
self.assertIsInstance(models[0], Immutable)
self.assertEqual('value1', models[0].key)
self.assertIsInstance(models[1], Immutable)
self.assertEqual('value2', models[1].key)
try:
kwargs = {'key': 'value'}
ret_json = {"meta": {"limit": 25, "next": None, "offset": 0, "previous": None, "total_count": 1},
"objects": [{"key": "value1"}, {"key": "value2"}]}
mock_send_request.return_value = ret_json
Immutable.endpoint = 'fake'
models = Immutable.list(**kwargs)
mock_send_request.assert_called_with('GET', 'api/subsystem/%s/fake' % Immutable._api_version, params=kwargs)
self.assertEqual(2, len(models))
self.assertIsInstance(models[0], Immutable)
self.assertEqual('value1', models[0].key)
self.assertIsInstance(models[1], Immutable)
self.assertEqual('value2', models[1].key)
finally:
if hasattr(Immutable, 'endpoint'):
delattr(Immutable, 'endpoint')
@mock.patch('dockercloud.api.base.send_request')
def test_immutable_refresh(self, mock_send_request):
try:
model = Immutable()
model.key = 'value'
self.assertFalse(model.refresh(force=False))
model = Immutable()
model.key = 'value'
self.assertFalse(model.refresh(force=False))
self.assertRaises(dockercloud.ApiError, model.refresh, force=True)
self.assertRaises(dockercloud.ApiError, model.refresh, force=True)
Immutable.endpoint = 'endpoint'
Immutable.subsystem = 'subsystem'
model._detail_uri = 'api/subsystem/%s/endpoint/uuid' % Immutable._api_version
mock_send_request.side_effect = [{'newkey': 'newvalue'}, None]
self.assertTrue(model.refresh(force=True))
self.assertEqual('newvalue', model.newkey)
mock_send_request.assert_called_with('GET', model._detail_uri)
Immutable.endpoint = 'resource_uri'
Immutable.subsystem = 'subsystem'
model.resource_uri = 'api/subsystem/%s/resource_uri/uuid' % Immutable._api_version
model._resource_uri = model.resource_uri
mock_send_request.side_effect = [{'newkey': 'newvalue'}, None]
self.assertTrue(model.refresh(force=True))
self.assertEqual('newvalue', model.newkey)
mock_send_request.assert_called_with('GET', model._resource_uri)
self.assertFalse(model.refresh(force=True))
mock_send_request.assert_called_with('GET', model._resource_uri)
self.assertFalse(model.refresh(force=True))
mock_send_request.assert_called_with('GET', model._detail_uri)
finally:
if hasattr(Immutable, 'endpoint'):
delattr(Immutable, 'endpoint')
class MutableTestCase(unittest.TestCase):
@ -196,56 +196,57 @@ class MutableTestCase(unittest.TestCase):
@mock.patch('dockercloud.api.base.send_request')
def test_mutable_delete(self, mock_send_request):
model = Mutable()
self.assertRaises(dockercloud.ApiError, model.delete)
try:
model = Mutable()
self.assertRaises(dockercloud.ApiError, model.delete)
Mutable.endpoint = 'fake'
model._resource_uri = 'fake/uuid'
mock_send_request.side_effect = [{'key': 'value'}, None]
self.assertTrue(model.delete())
self.assertEqual('value', model.key)
mock_send_request.assert_called_with('DELETE', 'fake/uuid')
Mutable.endpoint = 'fake'
model._detail_uri = 'fake/uuid'
mock_send_request.side_effect = [{'key': 'value'}, None]
self.assertTrue(model.delete())
self.assertEqual('value', model.key)
mock_send_request.assert_called_with('DELETE', 'fake/uuid')
model = Mutable()
model._resource_uri = 'fake/uuid'
self.assertTrue(model.delete())
self.assertIsNone(model._resource_uri)
self.assertFalse(model.is_dirty)
self.assertTrue(model.delete())
self.assertIsNone(model._detail_uri)
self.assertFalse(model.is_dirty)
finally:
if hasattr(Mutable, 'endpoint'):
delattr(Mutable, 'endpoint')
@mock.patch('dockercloud.api.base.send_request')
def test_mutable_save(self, mock_send_request):
self.assertTrue(Mutable().save())
model = Mutable()
model.key = 'value'
try:
delattr(Immutable, 'endpoint')
delattr(Immutable, 'subsystem')
except:
pass
self.assertRaises(AssertionError, model.save)
self.assertTrue(Mutable().save())
Mutable.endpoint = 'resource_uri'
Mutable.subsystem = 'subsystem'
mock_send_request.return_value = None
result = model.save()
mock_send_request.assert_called_with('POST', 'api/subsystem/%s/resource_uri' % Mutable._api_version,
data=json.dumps({'key': 'value'}))
self.assertFalse(result)
model = Mutable()
model.key = 'value'
self.assertRaises(AssertionError, model.save)
mock_send_request.return_value = {'newkey': 'newvalue'}
result = model.save()
mock_send_request.assert_called_with('POST', 'api/subsystem/%s/resource_uri' % Mutable._api_version,
data=json.dumps({'key': 'value'}))
self.assertTrue(result)
self.assertEqual('newvalue', model.newkey)
Mutable.endpoint = 'endpoint'
Mutable.subsystem = 'subsystem'
mock_send_request.return_value = None
result = model.save()
mock_send_request.assert_called_with('POST', 'api/subsystem/%s/endpoint' % Mutable._api_version,
data=json.dumps({'key': 'value'}))
self.assertFalse(result)
model.key = 'another value'
mock_send_request.return_value = {'newkey2': 'newvalue2'}
model._resource_uri = 'api/subsystem/%s/resource_uri/uuid' % Immutable._api_version
result = model.save()
mock_send_request.assert_called_with('PATCH', 'api/subsystem/%s/resource_uri/uuid' % Mutable._api_version,
data=json.dumps({'key': 'another value'}))
self.assertTrue(result)
self.assertEqual('another value', model.key)
self.assertEqual('newvalue2', model.newkey2)
mock_send_request.return_value = {'newkey': 'newvalue'}
result = model.save()
mock_send_request.assert_called_with('POST', 'api/subsystem/%s/endpoint' % Mutable._api_version,
data=json.dumps({'key': 'value'}))
self.assertTrue(result)
self.assertEqual('newvalue', model.newkey)
model.key = 'another value'
mock_send_request.return_value = {'newkey2': 'newvalue2'}
model._detail_uri = 'api/subsystem/%s/endpoint/uuid' % Immutable._api_version
result = model.save()
mock_send_request.assert_called_with('PATCH', 'api/subsystem/%s/endpoint/uuid' % Mutable._api_version,
data=json.dumps({'key': 'another value'}))
self.assertTrue(result)
self.assertEqual('another value', model.key)
self.assertEqual('newvalue2', model.newkey2)
finally:
if hasattr(Mutable, 'endpoint'):
delattr(Mutable, 'endpoint')

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -1,12 +1,9 @@
from __future__ import absolute_import
import requests
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import requests
import unittest.mock as mock
import dockercloud
from dockercloud.api.base import send_request

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *
@ -43,7 +40,7 @@ class NodeClusterTestCase(unittest.TestCase):
)
mock_send.return_value = fake_resp(fake_nodecluster_save)
cluster = dockercloud.NodeCluster.create(name="my_cluster", region="/api/v1/region/digitalocean/lon1/",
node_type="/api/v1/nodetype/digitalocean/1gb/")
node_type="/api/v1/nodetype/digitalocean/1gb/")
self.assertTrue(cluster.save())
result = json.loads(json.dumps(cluster.get_all_attributes()))
target = json.loads(json.dumps(attribute))
@ -56,7 +53,7 @@ class NodeClusterTestCase(unittest.TestCase):
)
mock_send.side_effect = [fake_resp(fake_nodecluster_save), fake_resp(fake_nodecluster_deploy)]
cluster = dockercloud.NodeCluster.create(name="my_cluster", region="/api/v1/region/digitalocean/lon1/",
node_type="/api/v1/nodetype/digitalocean/1gb/")
node_type="/api/v1/nodetype/digitalocean/1gb/")
cluster.save()
self.assertTrue(cluster.deploy())
result = json.loads(json.dumps(cluster.get_all_attributes()))

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -2,10 +2,7 @@ from __future__ import absolute_import
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from .fake_api import *

View file

@ -1,9 +1,6 @@
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import unittest.mock as mock
import dockercloud
from dockercloud.api.exceptions import ObjectNotFound, ApiError, NonUniqueIdentifier