[sldev-commits] r86 - trunk/certified_http

which.linden at svn.secondlife.com which.linden at svn.secondlife.com
Tue Dec 4 17:58:54 PST 2007


Author: which.linden
Date: 2007-12-04 19:58:54 -0600 (Tue, 04 Dec 2007)
New Revision: 86

Modified:
   trunk/certified_http/mysql_persist.py
   trunk/certified_http/oplog.py
   trunk/certified_http/server.py
   trunk/certified_http/server_test.py
Trac: http://svn.secondlife.com/trac/certified_http/changeset/86
Log:
Changed tombstoned to a datetime from a boolean, because The More You Know! (tm)  Implemented concurrent reply reporting, building on the inbuilt detection.  Paired by Which and Seep

Modified: trunk/certified_http/mysql_persist.py
===================================================================
--- trunk/certified_http/mysql_persist.py	2007-12-05 01:06:57 UTC (rev 85)
+++ trunk/certified_http/mysql_persist.py	2007-12-05 01:58:54 UTC (rev 86)
@@ -29,17 +29,17 @@
 from sys import stdout
 
 from eventlet import api, httpdate
-from oplog import PersistenceError
+from oplog import PersistenceError, ConcurrentReplayError
 
 named_queries = {
     'all-oplogs': "select id from oplog",
-    'open-oplogs': "select id from oplog where resumable = 1",
+    'open-oplogs': "select id from oplog where resumable = 1 and tombstoned is NULL",
     'create-oplog': "insert into oplog(id,full_id, date,version,resumable,last_entry, replayed, created) values(%s, %s, %s, %s, %s, NULL, from_unixtime(%s), from_unixtime(%s))",
     'oplog-exists': "select 1 from oplog where id = %s",
     'oplog-version': "select version from oplog where id = %s",
     'delete-oplog-parent': "delete from oplog where id = %s",
     'delete-oplog-entries':"delete from oplog_entries where id = %s",
-    'set-tombstoned': "update oplog set tombstoned = %s where id = %s",
+    'set-tombstoned': "update oplog set tombstoned = from_unixtime(%s) where id = %s",
     'count-oplogs-by-id':"select count(*) as count from oplog where id = %s",
     'last-index': "select last_entry from oplog where id = %s",
     'set-last-index': "update oplog set last_entry = %s where id = %s",
@@ -50,7 +50,7 @@
     'get-fullid': "select full_id from oplog where id = %s",
     'get-date': "select date from oplog where id = %s",
     'get-replays': "select unix_timestamp(replayed) as replayed, replays from oplog where id = %s",
-    'get-tombstoned': "select tombstoned from oplog where id = %s",
+    'get-tombstoned': "select unix_timestamp(tombstoned) as tombstoned from oplog where id = %s",
     'increment-replays': 'update oplog set replays = replays+1 , replayed = from_unixtime(%s) where id = %s',
     }
 
@@ -124,10 +124,11 @@
         try:
             db = self.connect()
             rs = self.run(db, 'get-tombstoned',[mid])
-            was_alive = len(rs) > 0 and (not rs[0].get('tombstoned', False))
+            was_alive = len(rs) > 0 and (not rs[0].get('tombstoned', None))
             if was_alive:
+                tombstoned_time = time.time()
                 self.run(db, 'delete-oplog-entries', [mid])
-                self.run(db, 'set-tombstoned', [1, mid])
+                self.run(db, 'set-tombstoned', [tombstoned_time, mid])
                 db.commit()
             else:
                 if len(rs) == 0:
@@ -138,8 +139,8 @@
         # tombstone the in-memory copy
         if op is None:
             op = self._active_oplogs.get(mid)
-        if op:
-            op._persister._tombstoned = True
+        if op and not op.tombstoned():
+            op._persister._tombstoned = tombstoned_time
 
         return was_alive
 
@@ -205,14 +206,14 @@
                 self._fullid = fullid
                 self._date = date
                 self._version = version
-                self._tombstoned = False
+                self._tombstoned = None
                 try:
                     self.run(db, 'create-oplog',[str(self._id), fullid, date, self._version, int(resumable), now, now])
                 except IntegrityError:
                     raise oplog.PersistenceError("integrity error creating oplog with mid=%s, fullid=%s, date=%s" % (mid, fullid, date))
                 self._replayed,self._replays = self._replays_from_db(db)
                 self._created = self.run(db, 'get-created', [self._id])[0]['created']
-                db.commit()
+                self._dbmgr.commit(db)
                 self._to_be_stored = []
                 self._last = None
                 self._num_stored = 0
@@ -248,8 +249,7 @@
             self._created = self.run(db, 'get-created', [self._id])[0]['created']
             self._fullid = self.run(db, 'get-fullid', [self._id])[0]['full_id']
             self._date = self.run(db, 'get-date', [self._id])[0]['date']
-            ts = self.run(db, 'get-tombstoned', [self._id])
-            self._tombstoned = bool(ts[0]['tombstoned'])
+            self._tombstoned = self.run(db, 'get-tombstoned', [self._id])[0]['tombstoned']
             self.run(db, 'increment-replays', [time.time(), self._id])
             self._replayed,self._replays = self._replays_from_db(db)
             rs = self.run(db, 'oplog-version', [self._id])
@@ -266,7 +266,7 @@
             except IndexError, KeyError:
                 self._num_stored = 0
             self._to_be_stored = []
-            db.commit()
+            self._dbmgr.commit(db)
         finally:
             if db: self._dbmgr.close(db)
 
@@ -282,13 +282,13 @@
             db = self.db(); del self._db
             nreplays = self._replays_from_db(db)[1]
             if not (self._replays == nreplays):
-                raise PersistenceError("Concurrent replay detected, aborting: while replaying oplog %s, " \
+                raise ConcurrentReplayError("While replaying oplog %s, " \
                                        "#replays should have been %s, was %s" % (self._id, self._replays, nreplays))
             if self._last is not None:
                 self.run(db, 'set-last-index', [int(self._last), self._id])
             for pos in range(0,len(self._to_be_stored)):
                 self.run(db, 'insert-oplog-entry', [self._id, self._num_stored+pos, pickle.dumps(self._to_be_stored[pos]), time.time()])
-            db.commit()
+            self._dbmgr.commit(db)
             self._num_stored += len(self._to_be_stored)
             self._to_be_stored = []
         finally:
@@ -307,7 +307,7 @@
             db = self.db(); del self._db
             nreplays = self._replays_from_db(db)[1]
             if not (self._replays == nreplays):
-                raise PersistenceError("Concurrent replay detected, aborting: while replaying oplog %s, " \
+                raise ConcurrentReplayError("While replaying oplog %s, " \
                                        "#replays should have been %s, was %s" % (self._id, self._replays, nreplays))
             
             rs = self.run(db, 'all-oplog-entries',[self._id])
@@ -365,7 +365,7 @@
   replays int not null default 0,
   replayed datetime not null,
   created datetime not null,
-  tombstoned tinyint not null default 0,
+  tombstoned datetime default null,
   primary key (id)
 ) Engine=InnoDB
     """)

Modified: trunk/certified_http/oplog.py
===================================================================
--- trunk/certified_http/oplog.py	2007-12-05 01:06:57 UTC (rev 85)
+++ trunk/certified_http/oplog.py	2007-12-05 01:58:54 UTC (rev 86)
@@ -17,30 +17,30 @@
 limitations under the License.
 """
 
+from datetime import datetime
 import re
+import sha
 import time
 import uuid
 import weakref
-from certified_http import fault_injector
 
-import mulib
 from mulib import mu
+from certified_http import fault_injector
 
-import datetime
-from datetime import datetime
-
 # "long time" 15 days
 _long_time = 15 * 24 * 60 * 60
 # "clock skew" 1 hr
 _skew = 60 * 60
 
-import sha
 def simple_sha1(s):
     return sha.new(s).hexdigest()
 
 class PersistenceError(Exception):
     pass
 
+class ConcurrentReplayError(Exception):
+    pass
+
 class Monitor(mu.Resource):
     def __init__(self,persist):
         self._persist = persist

Modified: trunk/certified_http/server.py
===================================================================
--- trunk/certified_http/server.py	2007-12-05 01:06:57 UTC (rev 85)
+++ trunk/certified_http/server.py	2007-12-05 01:58:54 UTC (rev 86)
@@ -76,7 +76,7 @@
 
         # look up the version in the oplog
         if op.version() == self._version:
-            self.handle_impl(request, full_message_id, op)
+            instance = self
         else:
             try:
                 instance = self.alts[op.version()]
@@ -85,10 +85,18 @@
                     503,
                     "%s version %s is trying to replay oplog version %s"\
                     "which isn't in this list: %s." % (type(self).__name__, self._version, op.version(), self.alts.keys()))
-                    
+
+        try:
             instance.handle_impl(request, full_message_id, op)
+        except oplog.ConcurrentReplayError, sre:
+            request.response(202, reason_phrase='Concurrent replay', body='')
 
     def handle_impl(self, request, full_message_id, op):
+        if op.tombstoned():
+            raise httpd.ErrorResponse(410,
+                                      body="Gone.  Tombstoned at %s." % httpc.to_http_time(op.tombstoned()),
+                                      headers={'X-Message-Id': full_message_id})
+        
         # check that the hash is the same as in the previous request
         req_hash = request_hash(request)
         creation_date = httpc.to_http_time(time.time())

Modified: trunk/certified_http/server_test.py
===================================================================
--- trunk/certified_http/server_test.py	2007-12-05 01:06:57 UTC (rev 85)
+++ trunk/certified_http/server_test.py	2007-12-05 01:58:54 UTC (rev 86)
@@ -55,6 +55,18 @@
         incr, self.accum = oplog.persist(self.accum.next)
         req.write(str(incr) + "\n" + body)
 
+class SleepResource(server.Resource):
+    def __init__(self, persistence):
+        super(SleepResource, self).__init__(persistence)
+
+    def handle_get(self, oplog, req):
+        import sys
+        print oplog.persist(lambda: 1); sys.stdout.flush()
+        print oplog.persist(lambda: 2); sys.stdout.flush()
+        api.sleep(5)
+        print oplog.persist(lambda: 3)
+        req.write("slept for five seconds")
+
 class PrintingResource(server.Resource):
     def __init__(self,persistence):
         super(PrintingResource, self).__init__(persistence)
@@ -92,6 +104,25 @@
         self.stop_server()
         self.tearDown_persistence()
 
+    def test_concurrent_replay_failure(self):
+        date = httpdate.format_date_time(time.time())
+        
+        self.root.child_sleep = SleepResource(self.persist)
+
+        def wait_and_die(expected_status, retval):
+            status, headers, body = httpc.get_('http://localhost:9903/sleep', \
+                                               headers={'X-Message-Id':'first_sleeper', 'Date': date},
+                                               ok=(expected_status,))
+            retval.append(status)
+
+        result1 = []
+        api.spawn(wait_and_die, 202, result1)
+        api.sleep(1)
+        result2 = []
+        wait_and_die(200, result2)
+        self.assertEquals(result1, [202])
+        self.assertEquals(result2, [200])
+
     def test_versioned_servers(self):
         date = httpdate.format_date_time(time.time())
         
@@ -163,9 +194,14 @@
         httpc.delete(headers['x-message-url'])
         # second should fail
         self.assertRaises(httpc.NotFound, httpc.delete, headers['x-message-url'])
+
         # make sure the oplog is gone
-        self.assert_(not self.persist.get_oplog(oplog.simple_sha1(msgid + date))._still_restoring())
+        self.assert_(self.persist.get_oplog(oplog.simple_sha1(msgid + date)).tombstoned())
 
+        # retry the request and make sure we get a 410
+        status2, headers2, body2 = httpc.post_('http://localhost:9903/', 'echo', headers={'X-Message-Id':msgid, 'Date':date}, ok=(410,))
+        self.assertEquals(status2, 410)
+
     def test_message_id_haxxing(self):
         date = httpdate.format_date_time(time.time())
 



More information about the sldev-commits mailing list