[sldev-commits] r104 - trunk/certified_http

which.linden at svn.secondlife.com which.linden at svn.secondlife.com
Tue Dec 11 16:56:14 PST 2007


Author: which.linden
Date: 2007-12-11 18:56:14 -0600 (Tue, 11 Dec 2007)
New Revision: 104

Added:
   trunk/certified_http/escrow_rsrc.py
   trunk/certified_http/escrow_test.py
Modified:
   trunk/certified_http/agentdb_test.py
   trunk/certified_http/dbmgr.py
   trunk/certified_http/escrow.py
Trac: http://svn.secondlife.com/trac/certified_http/changeset/104
Log:
started escrow service (shuffle) and simple resource (RSRC) types ("money" and no-copy-objects) with a simple test (one purchase with fixed users and object); paired by which and seep

Modified: trunk/certified_http/agentdb_test.py
===================================================================
--- trunk/certified_http/agentdb_test.py	2007-12-11 18:26:42 UTC (rev 103)
+++ trunk/certified_http/agentdb_test.py	2007-12-12 00:56:14 UTC (rev 104)
@@ -542,11 +542,13 @@
 
 class AgentdbProxy(object):
     def post_(self, url, body, aux=None):
+        print "agentdbproxy: POST url=%s body=%s" % (url,body)
         status, headers, rv = testclient.post_(url, body, ok=(200,403,503), aux=aux)
         if status == 403:
             raise Failed(rv)
         elif status == 503:
             raise Fault(rv)
+        print "agentdbproxy: rv=%s" % (rv,)
         return rv
 
     def get_userpart(self, op, url, user):

Modified: trunk/certified_http/dbmgr.py
===================================================================
--- trunk/certified_http/dbmgr.py	2007-12-11 18:26:42 UTC (rev 103)
+++ trunk/certified_http/dbmgr.py	2007-12-12 00:56:14 UTC (rev 104)
@@ -106,7 +106,7 @@
                 print "%s: [%s]RS: %s" % (tid(),db.thread_id(),rv); stdout.flush()
             return rv
         finally:
-            if c0 is not None: c0.close()
+            if c0: c0.close()
 
     def rundict(self, db, name, params=[], onfailure=None):
         assert(db)

Modified: trunk/certified_http/escrow.py
===================================================================
--- trunk/certified_http/escrow.py	2007-12-11 18:26:42 UTC (rev 103)
+++ trunk/certified_http/escrow.py	2007-12-12 00:56:14 UTC (rev 104)
@@ -18,45 +18,83 @@
 """
 
 
+import traceback
 from certified_http import client, server
 
 
+class RSRC(object):
+    """
+    The stand-in in the escrow server for a resource that can participate in escrow operations.
+    """
+    """ constructor is passed identifying information about the resource """
+    def __init__(self, value):
+        pass
+
+    """ gather this resource from the named owner """
+    def gather(self, oplog, owner):
+        pass
+    """ undo the gathering from that named owner """
+    def undo(self, oplog):
+        pass
+    """ deliver the resource to a named new owner """
+    def deliver(self, oplog, newowner):
+        pass
+
+
+def inorder_items(a_dict):
+    keys = a_dict.keys()
+    keys.sort()
+    for k in keys:
+        yield k, a_dict[k]
+
 class Escrow(server.Resource):
+    def __init__(self, persistence, resource_suite):
+        super(Escrow, self).__init__(persistence)
+        self._resource_suite = resource_suite
+        
     def handle_post(self, oplog, request):
         body = request.read_body()
+        print "Escrow: body=%s" % (body,)
         operation = body['operation']
         contents = body['contents']
         metadata = body['metadata']
         if operation == 'shuffle':
             self.shuffle(oplog, request, contents, metadata)
 
-    def gather(self, oplog, resource):
-        suite = self.get_plugin(resource['type'])
-        return suite.gather(oplog, resource['owner'], resource['value']))
-
-    def undo(self, oplog, resource):
-        suite = self.get_plugin(resource['type'])
-        return suite.undo(oplog, resource['owner'], resource['value']))
-
-
     def shuffle(self, oplog, request, contents, metadata):
+        print "shuffle: contents=%s" % (contents,)
         gather = contents['gather']
         distribute = contents['distribute']
         
-        gathered = []
+        gathered = {}
         try:
             for resource in gather:
-                gathered.append(self.gather(oplog, resource))
+                rsrc_klass = self.get_plugin(resource['type'])
+                print "gathering resource", resource['type'], resource['value'], rsrc_klass
+                rsrc = rsrc_klass(resource['value'])
+                rsrc.gather(oplog, resource['owner'])
+                gathered[resource['name']] = rsrc
         except:
-            for undoable in gathered:
-                self.undo(oplog, undoable)
+            print "exception in gathering"
+            traceback.print_exc()
+            for name, undoable in inorder_items(gathered):
+                print "undoing resource", name, undoable['type'], undoable['value']
+                undoable.undo(oplog)
+            req.write('"so NOT done"')
+            return
 
         # line of no return
+        print "Escrow: Line of no return"
 
-        for resource in gathered:
-            done = False
-            while not done:
-                target = filter(lambda x: x['name'] == resource['name'], distribute)[0]
-                self.deliver(resource, target)
-                done = True
+        for name, rsrc in inorder_items(gathered):
+            print "delivering resource", name, resource
+            target = distribute[name]
+            rsrc.deliver(oplog, target)
+
+        request.write('"so done"')
             
+    def get_plugin(self,restype):
+        try:
+            return self._resource_suite[restype]
+        except KeyError:
+            raise AssertionError("The resource type %s does not exist" % (restype,))

Added: trunk/certified_http/escrow_rsrc.py
===================================================================
--- trunk/certified_http/escrow_rsrc.py	                        (rev 0)
+++ trunk/certified_http/escrow_rsrc.py	2007-12-12 00:56:14 UTC (rev 104)
@@ -0,0 +1,101 @@
+"""\
+ at file escrow_rsrc.py
+
+Example resources for the escrow.
+
+Copyright (c) 2007, Linden Research, Inc.
+Copyright (c) 2007, IBM Corp.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from certified_http import escrow
+from certified_http.agentdb import Agentdb, DBMgr
+from certified_http.agentdb_test import AgentdbProxy
+
+_g_dbmgr = DBMgr({})
+
+_agent0url = "http://localhost:9903/agent0"
+a0 = AgentdbProxy()
+
+# "Valueless Inworld Currency" ha ha
+# The state of a Vic is either (amount) in which case it isn't
+# really the Vic, but the "idea" of the Vic, or (amount, owner, owner_url)
+# in which case it is the actual Vic that has been acquired from that owner.
+#
+class Vic(escrow.RSRC):
+    def __init__(self, value):
+        self.amount = value
+
+    def gather(self, oplog, owner):
+        user_url = a0.get_userpart(oplog, _agent0url, owner)
+        a0.deduct(oplog, user_url, owner, self.amount)
+        self.owner = owner
+        self.owner_url = user_url
+        print "Vic: gather deduct Vic %s from user %s" % (self.amount, self.owner)
+        
+    def undo(self, oplog):
+        print "Vic: undo credit Vic %s to user %s" % (self.amount, self.owner)
+        assert(hasattr(self, 'owner'))
+        a0.credit(oplog, self.owner_url, self.owner, self.amount)
+        del self.owner
+        del self.owner_url
+
+    """ deliver the resource to a named new owner """
+    def deliver(self, oplog, newowner):
+        print "Vic: deliver credit Vic %s to user %s" % (self.amount, newowner)
+        assert(hasattr(self, 'owner'))
+        newowner_url = a0.get_userpart(oplog, _agent0url, newowner)
+        a0.credit(oplog, newowner_url, newowner, self.amount)
+        del self.owner
+        del self.owner_url
+
+# "No Copy Object"
+# The state of a NoCopyObject is either (object-id) in which case it isn't
+# the actual object, but just the idea of the object, or (object-id, description, owner),
+# in which case it is the actual NoCopyObject that has been acquired from that owner.
+#
+class NoCopyObject(escrow.RSRC):
+    def __init__(self, value):
+        # TODO insert some check for valid OIDs
+        self.oid = value
+    
+    def gather(self, oplog, owner):
+        user_url = a0.get_userpart(oplog, _agent0url, owner)
+        description = a0.acquire(oplog, user_url, owner, self.oid)
+        self.description = description
+        self.owner = owner
+        self.owner_url = user_url
+        print "NCO: gather acquire %s(%s) from user %s" % (self.oid, self.description, self.owner)
+
+    def undo(self, oplog):
+        print "NCO: undo %s(%s) to user %s" % (self.oid, self.description, self.owner)
+        assert(hasattr(self, 'owner'))
+        a0.deliver(oplog, self.owner_url, self.owner, self.oid, self.description)
+        del self.description
+        del self.owner
+        del self.owner_url
+
+    def deliver(self, oplog, newowner):
+        print "NCO: deliver %s(%s) to user %s" % (self.oid, self.description, newowner)
+        assert(hasattr(self, 'owner'))
+        newowner_url = a0.get_userpart(oplog, _agent0url, newowner)
+        a0.deliver(oplog, newowner_url, newowner, self.oid, self.description)
+        del self.description
+        del self.owner
+        del self.owner_url
+
+_resource_suite = {
+    'Vic' : Vic,
+    'NoCopyObject' : NoCopyObject
+    }

Added: trunk/certified_http/escrow_test.py
===================================================================
--- trunk/certified_http/escrow_test.py	                        (rev 0)
+++ trunk/certified_http/escrow_test.py	2007-12-12 00:56:14 UTC (rev 104)
@@ -0,0 +1,603 @@
+"""\
+ at file agentdb_test.py
+
+Copyright (c) 2007, IBM Corp.
+Copyright (c) 2007, Linden Research, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import cStringIO
+import simplejson
+import shutil
+import tempfile
+import unittest
+import uuid
+
+from certified_http import fault_injector, escrow_rsrc
+from certified_http.fault_injector import Fault
+
+from certified_http import mysql_persist, server, client
+from mysql_persist_test import auth, create_db, drop_db
+
+from certified_http import agentdb, escrow
+
+import agentdb
+from agentdb import AgentDBResource, Failed
+
+from mulib import mu
+from eventlet import coros, api, httpd, util, httpc
+
+util.wrap_socket_with_coroutine_socket()
+
+class DummyDBMaker(object):
+    def __init__(self,db):
+        self._db = db
+
+    def db(self):
+        return self._db
+
+    def persist(self,method,*args, **kwargs):
+        kwargs.pop('exceptions',())
+        return apply(method,(self,)+args, kwargs)
+
+def fun_shuffle(op,r,l):
+    l = list(l)
+    r.shuffle(l)
+    return l
+
+def simple_purchase_transaction(op, _agentdb, buyer, seller, amount, oid):
+
+    op.persist(_agentdb.deduct, buyer, amount, exceptions=Failed)
+    try:
+        description = op.persist(_agentdb.acquire, seller, oid, exceptions=Failed)
+    except Failed, f:
+        op.persist(_agentdb.credit, buyer, amount)
+        raise f
+
+    op.persist(_agentdb.credit, seller, amount)
+    op.persist(_agentdb.deliver, buyer, oid, description)
+    return True
+
+def total_money(*agents):
+    def agent_money(agent):
+        dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._dbmgr.connect()
+            return int(dbmgr.rundict(db, 'count-total-money')['money'])
+        finally:
+            if db: agent._dbmgr.close(db)
+    return reduce((lambda a,b:a+b), [agent_money(a) for a in agents])
+
+def entire_inventory(*agents):
+    def agent_inventory(agent):
+        dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._dbmgr.connect()
+            rs = dbmgr.run(db, 'entire-inventory')
+            l = [(e['oid'],e['description'].tostring()) for e in rs]
+            return l
+        finally:
+            if db: agent._dbmgr.close(db)
+    l = reduce((lambda a,b:a+b),[agent_inventory(a) for a in agents])
+    l.sort()
+    return l
+
+def all_users(*agents):
+    def agent_users(agent):
+        dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._dbmgr.connect()
+            rs = dbmgr.run(db, 'all-users')
+            return [e['uid'] for e in rs]
+        finally:
+            if db: agent._dbmgr.close(db)
+    return reduce((lambda a,b:a+b),[agent_users(a) for a in agents])
+
+def count_inventory(*agents):
+    def agent_inventory(agent):
+        dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._dbmgr.connect()
+            return dbmgr.rundict(db, 'count-inventory')['nitems']
+        finally:
+            if db: agent._dbmgr.close(db)
+    return reduce((lambda a,b:a+b),[agent_inventory(a) for a in agents])
+
+def count_users(*agents):
+    def agent_users(agent):
+        dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._dbmgr.connect()
+            rv = dbmgr.rundict(db, 'count-users')['nusers']
+            print "agent_users(%s): %s" % (agent._url, rv)
+            return rv
+        finally:
+            if db: agent._dbmgr.close(db)
+    return reduce((lambda a,b:a+b),[agent_users(a) for a in agents])
+
+class TestAgentdb(unittest.TestCase):
+    def setUp(self):
+        global auth
+        create_db(auth)
+        self._agent = agentdb.AgentDBResource(auth, url='uri:none')
+        self._agent._setup()
+        self._agentdb = self._agent._agentdb
+        self._dbmgr = self._agent._dbmgr
+
+    def tearDown(self):
+        del self._dbmgr
+        del self._agentdb
+        self._agent._teardown()
+        import dbmgr
+        del self._agent
+        drop_db(auth)
+
+    def test_adduser(self):
+        db = None
+        try:
+            db = self._dbmgr.connect()
+            op = DummyDBMaker(db)
+            self._agentdb.adduser(op,'jake',10)
+            self._dbmgr.commit(db)
+            self.assertEqual(self._agentdb.balance(op, 'jake'), 10)
+        finally:
+            if db: self._dbmgr.close(db)
+
+    def test_deluser(self):
+        db = None
+        try:
+            db = self._dbmgr.connect()
+            op = DummyDBMaker(db)
+            self._agentdb.adduser(op,'jake',10)
+            self._dbmgr.commit(db)
+            self._agentdb.deluser(op,'jake')
+            self._dbmgr.commit(db)
+            self.assertRaises(Failed, self._agentdb.balance, op, 'jake')
+        finally:
+            if db: self._dbmgr.close(db)
+
+    def test_simple_purchase(self):
+        db = None
+        try:
+            db = self._dbmgr.connect(); op = DummyDBMaker(db)
+            self._agentdb.adduser(op,'chet',100)
+            self._agentdb.adduser(op,'joe',100)
+            self._agentdb.deliver(op,'joe','1234-5678','An object')
+            self._dbmgr.commit(db)
+            self._dbmgr.close(db); db = None
+            inv0 = entire_inventory(self._agent)
+            db = self._dbmgr.connect(); op = DummyDBMaker(db)
+            simple_purchase_transaction(op, self._agentdb, 'chet','joe',10,'1234-5678')
+            self._dbmgr.close(db); db = None
+            self.assertEqual(total_money(self._agent),200)
+            inv1 = entire_inventory(self._agent)
+            self.assertEqual(inv0,inv1)
+        finally:
+            if db: self._dbmgr.close(db)
+
+_nusers= 1
+_ndestitute = 2
+_nhomeless = 3
+_initial_balance=100
+_objs_per_user=10
+
+import random
+r = random.WichmannHill()
+_seed = 0x31337
+_num_purchases = 30
+_concurrency = 10
+
+class FailingResource(mu.Resource):
+    def handle_post(self,req):
+        assert(False)
+
+    def handle_get(self,req):
+        assert(False)
+
+    def handle_delete(self,req):
+        assert(False)
+
+class AgentdbPersistTest(unittest.TestCase):
+    def setUp(self):
+        if not(hasattr(self,'_nagents')):
+            self._nagents = 1
+        r.seed(_seed)
+        self.root = {}
+        self.setUp_agents()
+        self.setUp_purchaser()
+        self.setUp_client()
+        self.start_server()
+
+    def setUp_agents(self):
+        global auth
+        print "#agents: %s"% (self._nagents,)
+        self._agents = []
+        self._agent2users = {}
+        for i in range(0,self._nagents):
+            agent_auth = auth.copy()
+            agent_auth['db'] += 'agent' + str(i)
+            create_db(agent_auth)
+            persist = mysql_persist.Persistence(agent_auth)
+            self.root['agent'+str(i)+'monitor'] = persist.get_monitor()
+            self._agents.append(agentdb.AgentDBResource(persist=persist, url='http://localhost:9903/agent'+str(i)))
+            self._agents[i]._setup()
+            self._agent2users[i] = self._populate_agent(self._agents[i],'a'+str(i))
+        self._special_users = self._populate_special_users(self._agents[0])
+
+        for i in range(0,len(self._agents)):
+            _agentdb = self._agents[i]._agentdb
+            db = None
+            try:
+                db = _agentdb._dbmgr.connect() ; op = DummyDBMaker(db)
+                for i in range(0,len(self._agents)):
+                    for u in self._agent2users[i]:
+                        _agentdb.set_userpart(op, u, "http://localhost:9903/agent"+str(i),i)
+                for u in self._special_users:
+                    _agentdb.set_userpart(op, u, "http://localhost:9903/agent"+str(0),0)
+                _agentdb._dbmgr.commit(db)
+            finally:
+                if db: _agentdb._dbmgr.close(db)
+
+    def tearDown_agents(self):
+        self._agents[0]._teardown()
+        for i in range(0,self._nagents):
+            drop_db(self._agents[i]._auth)
+        del self._agents
+
+    def _populate_agent(self, agent,prefix):
+        _agentdb = agent._agentdb
+        _dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._persister.connect(); op = DummyDBMaker(db)
+            users = []
+            for i in range(0,_nusers):
+                user = prefix+'user'+str(i)
+                users.append(user)
+                _agentdb.adduser(op, user,_initial_balance)
+                for j in range(0,_objs_per_user):
+                    _agentdb.deliver(op, user, prefix+'u'+str(i)+'obj'+str(j), ("agent %s user %s object %s" % (prefix, i,j)))
+                _dbmgr.commit(db)
+            for i in range(0,_ndestitute):
+                user = prefix+'destitute-user'+str(i)
+                users.append(user)
+                _agentdb.adduser(op, user,0)
+                for j in range(0,_objs_per_user):
+                   _agentdb.deliver(op,user, prefix+'du'+str(i)+'obj'+str(j), ("agent %s destitute user %s object %s" % (prefix,i,j)))
+                _dbmgr.commit(db)
+            for i in range(0,_nhomeless):
+                user = prefix+'homeless-user'+str(i)
+                users.append(user)
+                _agentdb.adduser(op, user,_initial_balance)
+                _dbmgr.commit(db)
+        finally:
+            if db: _dbmgr.close(db)
+        return users
+
+    def _populate_special_users(self, agent):
+        _agentdb = agent._agentdb
+        _dbmgr = agent._dbmgr
+        db = None
+        try:
+            db = agent._persister.connect(); op = DummyDBMaker(db)
+            _agentdb.adduser(op,'chet',100)
+            _agentdb.adduser(op,'joe',100)
+            _agentdb.deliver(op,'joe','1234-5678','An object')
+            _dbmgr.commit(db)
+        finally:
+            if db: _dbmgr.close(db)
+        return ['chet','joe']
+
+    def start_server(self):
+        # set up our echo server
+        for i in range(0,len(self._agents)):
+            self.root['agent'+str(i)] = self._agents[i]
+        self.logfile = cStringIO.StringIO()
+        #self.killer = api.spawn(
+        #    httpd.server, api.tcp_listener(('0.0.0.0', 9903)), mu.SiteMap(self.root), log=self.logfile)
+        self.killer = api.spawn(
+            httpd.server, api.tcp_listener(('0.0.0.0', 9903)), mu.SiteMap(self.root))
+
+    def stop_server(self):
+        api.kill(self.killer)
+
+    def setUp_purchaser(self):
+        self.root['purchase'] = FailingResource()
+
+    def tearDown_purchaser(self):
+        pass
+
+    def setUp_client(self):
+        self._client_persist = self._agents[0]._persister
+
+    def tearDown_client(self):
+        del self._client_persist
+
+    def tearDown(self):
+        self.stop_server()
+        self.tearDown_client()
+        self.tearDown_purchaser()
+        self.tearDown_agents()
+        del self.root
+
+    def test_simple_persistence(self):
+        inv0 = entire_inventory(*tuple(self._agents))
+        money0 = total_money(*tuple(self._agents))
+        op = self._client_persist.get_oplog('stuff')
+        rv = self.simple_purchase_transaction_proxy(op, 'chet','joe',10,'1234-5678')
+        self.assertEqual(rv,True)
+        inv1 = entire_inventory(*tuple(self._agents))
+        money1 = total_money(*tuple(self._agents))
+        self.assertEqual(inv0,inv1)
+        self.assertEqual(money0,money1)
+
+    def test_simple_failure(self):
+        op = self._client_persist.get_oplog('stuff')
+        self.assertRaises(Failed,self.simple_purchase_transaction_proxy, op,  'chet','joe',10000000,'1234-5678')
+
+    def test_replay_failure(self):
+        op = self._client_persist.get_oplog('stuff')
+        self.assertRaises(Failed,self.simple_purchase_transaction_proxy, op,  'chet','joe',10000000,'1234-5678')
+        op.reset()
+        self.assertRaises(Failed,self.simple_purchase_transaction_proxy, op,  'chet','joe',10000000,'1234-5678')
+
+    def test_user_setup(self):
+        self.assertEqual(count_users(*tuple(self._agents)), self._nagents * (_nusers + _ndestitute + _nhomeless) + 2)
+        self.assertEqual(count_inventory(*tuple(self._agents)), self._nagents * ((_nusers+_ndestitute) * _objs_per_user) + 1)
+        for i in range(0,len(self._agents)):
+            agent = self._agents[i]
+            dbmgr = agent._dbmgr
+            db = None
+            try:
+                db = dbmgr.connect(); op = DummyDBMaker(db)
+                for j in range(0,len(self._agent2users)):
+                    users = self._agent2users[j]
+                    for u in users:
+                        self.assertEqual(j,agent._agentdb.get_userpart(op,u)[1])
+                for u in self._special_users:
+                    self.assertEqual(0,agent._agentdb.get_userpart(op,u)[1])
+            finally:
+                if db: dbmgr.close(db)
+
+# select two users at random
+# randomly select one of them to be the seller
+# if the seller doesn't have an object, make up a bogus OID
+# otherwise select randomly a seller's object
+# price is randomly in [0, 2 * buyer's balance)
+
+    def random_purchase(self, locks, op, all_users):
+        u1,u2 = op.persist((lambda op,l,n: r.sample(l,n)), all_users,2)
+        if locks.has_key(u1) or locks.has_key(u2):
+            return (u1,u2,0,"")
+        else:
+            locks[u1] = u1
+            locks[u2] = u2
+        try:
+            assert(u1 != u2)
+            users = list( (u1, u2) )
+            users = op.persist(fun_shuffle,r,users)
+            buyer, seller = users
+            sellers_inventory = self.sellers_inventory_proxy(op, seller)
+            if not sellers_inventory:
+                buy_oid = 'bogus-oid-'+str(r.random())
+            else:
+                buy_oid = r.sample(sellers_inventory,1)[0]
+
+            buyers_balance = self.buyers_balance_proxy(op, buyer)
+            price = r.randint(0,2*buyers_balance+1)
+            try:
+                self.simple_purchase_transaction_proxy(op, buyer, seller, price, buy_oid)
+                return None
+            except Failed:
+                return (buyer, seller, price, buy_oid)
+        finally:
+            del locks[u1]
+            del locks[u2]
+
+    def simple_purchase_transaction_proxy(self, op, buyer, seller, price, buy_oid):
+        assert(len(self._agents) == 1)
+        return simple_purchase_transaction(op, self._agents[0]._agentdb, buyer, seller, price, buy_oid)
+
+    def sellers_inventory_proxy(self, op, seller):
+        assert(len(self._agents) == 1)
+        def doit(op):
+            return [e['oid']
+                    for e in self._agents[0]._dbmgr.run(op.db(),'inventory-oids',{'uid': seller})]
+
+        return op.persist(doit)
+
+    def buyers_balance_proxy(self, op, buyer):
+        assert(len(self._agents) == 1)
+        def doit(op):
+            return int(self._agents[0]._dbmgr.rundict(op.db(),'account-balance',{'uid': buyer})['balance'])
+
+        return op.persist(doit)
+
+    def test_many_purchases(self):
+        print "test_many_purchases"
+        entire_user_list = all_users(*tuple(self._agents))
+        inv0 = entire_inventory(*tuple(self._agents))
+        money0 = total_money(*tuple(self._agents))
+        failed_count = 0
+        locks = {}
+        for _temp in range(0,_num_purchases):
+            mid = str(uuid.uuid4())
+            op = self._client_persist.get_oplog(mid)
+            if self.random_purchase(locks, op, entire_user_list):
+                failed_count += 1
+            self._client_persist.tombstone(op)
+        self.assertEqual(total_money(*tuple(self._agents)),money0)
+        inv1 = entire_inventory(*tuple(self._agents))
+        self.assertEqual(inv0,inv1)
+
+        print "Failed", failed_count
+
+    def test_concurrent_purchases(self):
+        print "test_concurrent_purchases"
+        inv0 = entire_inventory(*tuple(self._agents))
+        money0 = total_money(*tuple(self._agents))
+        entire_user_list = all_users(*tuple(self._agents))
+        pool = coros.CoroutinePool(max_size=_concurrency)
+        locks = {}
+        def doit(_id):
+            failed_count = 0
+            print ">id", _id
+            for _temp in range(0,int(_num_purchases/_concurrency)):
+                mid = str(uuid.uuid4())
+                op = self._client_persist.get_oplog(mid)
+                if self.random_purchase(locks, op, entire_user_list):
+                    failed_count += 1
+                self._client_persist.tombstone(op)
+            print "<id", _id
+            return failed_count
+        waiters = []
+        for _temp in range(0,_concurrency):
+            waiters.append(pool.execute(doit, _temp))
+        totfailed = 0
+        for waiter in waiters:
+            totfailed += waiter.wait()
+
+        self.assertEqual(total_money(*tuple(self._agents)),money0)
+        inv1 = entire_inventory(*tuple(self._agents))
+        self.assertEqual(inv0,inv1)
+        print "total failed: ",totfailed
+
+testclient = client.CertifiedHttpSuite(retry_interval=(0.01, 0.1), loader=simplejson.loads, dumper=simplejson.dumps, fallback_content_type='application/json')
+
+purchaseurl = "http://localhost:9903/purchase"
+agent0url = "http://localhost:9903/agent0"
+class RPCAgentdbPersistTest(AgentdbPersistTest):
+    def simple_purchase_transaction_proxy(self, op, buyer, seller, price, buy_oid):
+        body = {
+            'operation': 'shuffle',
+            'contents' :
+            { 'gather':
+              [ { 'name': 'the-money', 'owner': buyer, 'type': 'Vic', 'value': price },
+                { 'name': 'the-obj', 'owner': seller, 'type': 'NoCopyObject', 'value': buy_oid },
+                ],
+              'distribute':
+              [ { 'name': 'the-money', 'owner': seller },
+                { 'name': 'the-obj', 'owner': buyer } ]
+              },
+            'metadata' : {}
+            }
+        status, headers, rv = testclient.post_(purchaseurl, body, ok=(200, 403,503), aux=op)
+        if status == 403:
+            raise Failed(rv)
+        elif status == 503:
+            raise Fault(rv)
+        return rv
+
+    def sellers_inventory_proxy(self, op, seller):
+        l = (AgentdbProxy()).inventory(op,agent0url,seller)
+        return [r['oid'] for r in l]
+
+    def buyers_balance_proxy(self, op, buyer):
+        return (AgentdbProxy()).balance(op,agent0url,buyer)
+ 
+    def setUp_purchaser(self):
+        self.fail()
+
+    def setUp_client(self):
+        global auth
+        self._client_auth = auth.copy()
+        self._client_auth['db'] += 'client'
+        create_db(self._client_auth)
+        mysql_persist._setup(**self._client_auth)
+        self._client_persist = mysql_persist.Persistence(self._client_auth)
+        self.root['client-monitor'] = self._client_persist.get_monitor()
+
+    def tearDown_client(self):
+        del self._client_persist
+        drop_db(self._client_auth)
+
+_nagents = 4
+
+a0 = escrow_rsrc.AgentdbProxy()
+
+class EscrowTest(RPCAgentdbPersistTest):
+    def setUp(self):
+        self._nagents = _nagents
+        super(EscrowTest, self).setUp()
+        print "================================================================"
+
+    def _make_purchase_resource(self):
+        return escrow.Escrow(self._purchaser_persist, escrow_rsrc._resource_suite)
+
+    def setUp_purchaser(self):
+        global auth
+        self._purchaser_auth = auth.copy()
+        self._purchaser_auth['db'] += 'purchaser'
+        create_db(self._purchaser_auth)
+        mysql_persist._setup(**self._purchaser_auth)
+        self._purchaser_persist = mysql_persist.Persistence(self._purchaser_auth)
+        self.root['purchaser-monitor'] = self._purchaser_persist.get_monitor()
+
+        self.root['purchase'] = self._make_purchase_resource()
+ 
+    def tearDown_purchaser(self):
+        del self._purchaser_persist
+        drop_db(self._purchaser_auth)
+
+    def simple_purchase_transaction(self, op, buyer, seller, price, buy_oid):
+        body = {
+            'operation': 'shuffle',
+            'contents' :
+            { 'gather':
+              [ { 'name': 'the-money', 'owner': buyer, 'type': 'Vic', 'value': price },
+                { 'name': 'the-obj', 'owner': seller, 'type': 'NoCopyObject', 'value': buy_oid },
+                ],
+              'distribute':
+              { 'the-money' : seller,
+                'the-obj' : buyer }
+              },
+            'metadata' : {}
+            }
+        status, headers, rv = testclient.post_(purchaseurl, body, ok=(200, 403,503), aux=op)
+        if status == 403:
+            raise Failed(rv)
+        elif status == 503:
+            raise Fault(rv)
+        return rv
+
+    def test_simple_purchase(self):
+        print 'joe', users_money(self._agents[0]._agentdb, 'joe')
+        print 'chet', users_money(self._agents[0]._agentdb, 'chet')
+        url = self._agents[0]._url
+        op = self._client_persist.get_oplog('stuff')
+        inv0 = entire_inventory(*tuple(self._agents))
+        money0 = total_money(*tuple(self._agents))
+        self.simple_purchase_transaction(op, 'chet','joe',10,'1234-5678')
+        self.assertEqual(total_money(*tuple(self._agents)),money0)
+        inv1 = entire_inventory(*tuple(self._agents))
+        self.assertEqual(inv0,inv1)
+        print 'joe', users_money(self._agents[0]._agentdb, 'joe')
+        print 'chet', users_money(self._agents[0]._agentdb, 'chet')
+
+def users_money(agent, user):
+    dbmgr = agent._dbmgr
+    db = None
+    try:
+        db = agent._dbmgr.connect()
+        return int(dbmgr.rundict(db, 'account-balance',{'uid':user})['balance'])
+    finally:
+        if db: agent._dbmgr.close(db)
+
+if __name__ == "__main__":
+    unittest.main()



More information about the sldev-commits mailing list