# HG changeset patch # User Bernhard Herzog # Date 1239301204 0 # Node ID b3f9cc956accff3a3ac08ef490339e74a3d50731 # Parent a3f10658052574f282af14d9133861b88358d1a4 Make the Status class usable without a filename and provide methods to (de-)serialize to and from memory to make writing tests easier. diff -r a3f106580525 -r b3f9cc956acc test/test_status.py --- a/test/test_status.py Mon Mar 09 16:00:44 2009 +0000 +++ b/test/test_status.py Thu Apr 09 18:20:04 2009 +0000 @@ -1,4 +1,4 @@ -# Copyright (C) 2007, 2008 by Intevation GmbH +# Copyright (C) 2007, 2008, 2009 by Intevation GmbH # Authors: # Bernhard Herzog # @@ -12,8 +12,9 @@ import unittest from datetime import datetime -from treepkg.status import RevisionStatus, Status, EnumFieldDesc -from treepkg.util import ensure_directory, writefile +from treepkg.status import RevisionStatus, Status, EnumFieldDesc, \ + StringFieldDesc, DateFieldDesc +from treepkg.util import writefile from filesupport import FileTestMixin @@ -82,3 +83,51 @@ otherstatus = TestStatus(self.filename) self.assertEquals(otherstatus.status.name, "running") + + def test_serialize(self): + class TestStatus(Status): + _magic = "TestStatus 1.0\n" + + status = EnumFieldDesc() + status.add("not_running", "Process is not running", + default=True) + status.add("running", "Process is running") + status.add("error", "An error occurred") + + start = DateFieldDesc(default=None) + + name = StringFieldDesc(default="fred") + + status = TestStatus(None) + status.running() + status.start = datetime(2007, 3, 9, 17, 32, 55) + status.name = "Dave" + + self.assertEquals(sorted(status.serialize()), + ["TestStatus 1.0\n", + "name: Dave\n", + "start: 2007-03-09 17:32:55\n", + "status: running\n"]) + + def test_deserialize(self): + class TestStatus(Status): + _magic = "TestStatus 1.0\n" + + status = EnumFieldDesc() + status.add("not_running", "Process is not running", + default=True) + status.add("running", "Process is running") + status.add("error", "An error occurred") + + start = DateFieldDesc(default=None) + + name = StringFieldDesc(default="fred") + + status = TestStatus(None) + status.deserialize(iter(["TestStatus 1.0\n", + "name: Dave\n", + "start: 2007-03-09 17:32:55\n", + "status: running\n"])) + self.assertEquals(status.status.name, "running") + self.assertEquals(status.start, datetime(2007, 3, 9, 17, 32, 55)) + self.assertEquals(status.name, "Dave") diff -r a3f106580525 -r b3f9cc956acc treepkg/status.py --- a/treepkg/status.py Mon Mar 09 16:00:44 2009 +0000 +++ b/treepkg/status.py Thu Apr 09 18:20:04 2009 +0000 @@ -1,4 +1,4 @@ -# Copyright (C) 2007, 2008 by Intevation GmbH +# Copyright (C) 2007, 2008, 2009 by Intevation GmbH # Authors: # Bernhard Herzog # @@ -124,35 +124,46 @@ _attrs = set(["_filename", "_values"]) def __init__(self, filename): - assert os.path.isabs(filename) + self._values = dict() self._filename = filename - self.read() - - def _init_values(self): - self._values = {} + if self._filename is not None: + assert os.path.isabs(self._filename) + self.read() - def read(self): - self._init_values() - if not os.path.exists(self._filename): - return - f = open(self._filename) - try: - magic = f.next() - if magic != self._magic: - raise ValueError("File %r has wrong magic" % self._filename) - for line in f: - field, value = line.split(":", 1) - self._values[field] = self._fields[field].deserialize(value) - finally: - f.close() + def deserialize(self, iter_lines, filename=None): + magic = iter_lines.next() + if magic != self._magic: + message = ("Wrong magic: found %r but expected %r" + % (magic, self._magic)) + if filename is not None: + message += " (read from %r)" % filename + raise ValueError(message) + values = dict() + for line in iter_lines: + field, value = line.split(":", 1) + values[field] = self._fields[field].deserialize(value) + self._values = values - def write(self): + def serialize(self): lines = [self._magic] for field, desc in self._fields.items(): if field in self._values: lines.append("%s: %s\n" % (field, desc.serialize(self._values[field]))) - util.writefile(self._filename, "".join(lines), 0644) + return lines + + def read(self): + if not os.path.exists(self._filename): + return + f = open(self._filename) + try: + self.deserialize(f, self._filename) + finally: + f.close() + + def write(self): + if self._filename is not None: + util.writefile(self._filename, "".join(self.serialize()), 0644) def __getattr__(self, attr): desc = self._fields.get(attr)