changeset 250:b3f9cc956acc

Make the Status class usable without a filename and provide methods to (de-)serialize to and from memory to make writing tests easier.
author Bernhard Herzog <bh@intevation.de>
date Thu, 09 Apr 2009 18:20:04 +0000
parents a3f106580525
children 243f206574cb
files test/test_status.py treepkg/status.py
diffstat 2 files changed, 85 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/test/test_status.py	Mon Mar 09 16:00:44 2009 +0000
+++ b/test/test_status.py	Thu Apr 09 18:20:04 2009 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2007, 2008 by Intevation GmbH
+# Copyright (C) 2007, 2008, 2009 by Intevation GmbH
 # Authors:
 # Bernhard Herzog <bh@intevation.de>
 #
@@ -12,8 +12,9 @@
 import unittest
 from datetime import datetime
 
-from treepkg.status import RevisionStatus, Status, EnumFieldDesc
-from treepkg.util import ensure_directory, writefile
+from treepkg.status import RevisionStatus, Status, EnumFieldDesc, \
+     StringFieldDesc, DateFieldDesc
+from treepkg.util import writefile
 
 from filesupport import FileTestMixin
 
@@ -82,3 +83,51 @@
 
         otherstatus = TestStatus(self.filename)
         self.assertEquals(otherstatus.status.name, "running")
+
+    def test_serialize(self):
+        class TestStatus(Status):
+            _magic = "TestStatus 1.0\n"
+
+            status = EnumFieldDesc()
+            status.add("not_running", "Process is not running",
+                       default=True)
+            status.add("running", "Process is running")
+            status.add("error", "An error occurred")
+
+            start = DateFieldDesc(default=None)
+
+            name = StringFieldDesc(default="fred")
+
+        status = TestStatus(None)
+        status.running()
+        status.start = datetime(2007, 3, 9, 17, 32, 55)
+        status.name = "Dave"
+
+        self.assertEquals(sorted(status.serialize()),
+                          ["TestStatus 1.0\n",
+                           "name: Dave\n",
+                           "start: 2007-03-09 17:32:55\n",
+                           "status: running\n"])
+
+    def test_deserialize(self):
+        class TestStatus(Status):
+            _magic = "TestStatus 1.0\n"
+
+            status = EnumFieldDesc()
+            status.add("not_running", "Process is not running",
+                       default=True)
+            status.add("running", "Process is running")
+            status.add("error", "An error occurred")
+
+            start = DateFieldDesc(default=None)
+
+            name = StringFieldDesc(default="fred")
+
+        status = TestStatus(None)
+        status.deserialize(iter(["TestStatus 1.0\n",
+                                 "name: Dave\n",
+                                 "start: 2007-03-09 17:32:55\n",
+                                 "status: running\n"]))
+        self.assertEquals(status.status.name, "running")
+        self.assertEquals(status.start, datetime(2007, 3, 9, 17, 32, 55))
+        self.assertEquals(status.name, "Dave")
--- a/treepkg/status.py	Mon Mar 09 16:00:44 2009 +0000
+++ b/treepkg/status.py	Thu Apr 09 18:20:04 2009 +0000
@@ -1,4 +1,4 @@
-# Copyright (C) 2007, 2008 by Intevation GmbH
+# Copyright (C) 2007, 2008, 2009 by Intevation GmbH
 # Authors:
 # Bernhard Herzog <bh@intevation.de>
 #
@@ -124,35 +124,46 @@
     _attrs = set(["_filename", "_values"])
 
     def __init__(self, filename):
-        assert os.path.isabs(filename)
+        self._values = dict()
         self._filename = filename
-        self.read()
-
-    def _init_values(self):
-        self._values = {}
+        if self._filename is not None:
+            assert os.path.isabs(self._filename)
+            self.read()
 
-    def read(self):
-        self._init_values()
-        if not os.path.exists(self._filename):
-            return
-        f = open(self._filename)
-        try:
-            magic = f.next()
-            if magic != self._magic:
-                raise ValueError("File %r has wrong magic" % self._filename)
-            for line in f:
-                field, value = line.split(":", 1)
-                self._values[field] = self._fields[field].deserialize(value)
-        finally:
-            f.close()
+    def deserialize(self, iter_lines, filename=None):
+        magic = iter_lines.next()
+        if magic != self._magic:
+            message = ("Wrong magic: found %r but expected %r"
+                       % (magic, self._magic))
+            if filename is not None:
+                message += " (read from %r)" % filename
+            raise ValueError(message)
+        values = dict()
+        for line in iter_lines:
+            field, value = line.split(":", 1)
+            values[field] = self._fields[field].deserialize(value)
+        self._values = values
 
-    def write(self):
+    def serialize(self):
         lines = [self._magic]
         for field, desc in self._fields.items():
             if field in self._values:
                 lines.append("%s: %s\n"
                              % (field, desc.serialize(self._values[field])))
-        util.writefile(self._filename, "".join(lines), 0644)
+        return lines
+
+    def read(self):
+        if not os.path.exists(self._filename):
+            return
+        f = open(self._filename)
+        try:
+            self.deserialize(f, self._filename)
+        finally:
+            f.close()
+
+    def write(self):
+        if self._filename is not None:
+            util.writefile(self._filename, "".join(self.serialize()), 0644)
 
     def __getattr__(self, attr):
         desc = self._fields.get(attr)
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)