Mercurial > farol > farolluz
diff farolluz/document.py @ 26:809db989cac5
Reorganize the code in smaller mpodules
author | Benoît Allard <benoit.allard@greenbone.net> |
---|---|
date | Fri, 24 Oct 2014 17:01:26 +0200 |
parents | farolluz/cvrf.py@4004b67216a9 |
children | b15022ae484a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/farolluz/document.py Fri Oct 24 17:01:26 2014 +0200 @@ -0,0 +1,387 @@ +# -*- coding: utf-8 -*- +# +# Authors: +# BenoƮt Allard <benoit.allard@greenbone.net> +# +# Copyright: +# Copyright (C) 2014 Greenbone Networks GmbH +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + +"""\ +Objects related to CVRF Documents +""" + +from .common import ValidationError +from .producttree import CVRFProductTree, CVRFRelationship + +class CVRFPublisher(object): + TYPES = ('Vendor', 'Discoverer', 'Coordinator', 'User', 'Other') + def __init__(self, _type, vendorid=None): + self._type = _type + self._vendorid = vendorid + self._contact = None + self._authority = None + + def setContact(self, contact): + self._contact = contact + + def setAuthority(self, authority): + self._authority = authority + + def validate(self): + if not self._type: + raise ValidationError('Document Publisher needs to have a type') + if self._type not in self.TYPES: + raise ValidationError('Document Publisher Type needs to be one of %s' % ', '.join(self.TYPES)) + + def __str__(self): + s = 'CVRFPublisher: %s' % self._type + if self._vendorid is not None: + s += ' ID: %s' % self._vendorid + if self._contact is not None: + s += ' Contact: "%s"' % self._contact + if self._authority is not None: + s += ' Authority: "%s"' % self._authority + return s + + +class CVRFTrackingID(object): + def __init__(self, _id): + self._id = _id + self._aliases = [] + + def addAlias(self, alias): + self._aliases.append(alias) + + def getId(self): + return self._id + + def validate(self): + if not self._id: + raise ValidationError('Document ID cannot be left empty') + + def __str__(self): + if self._aliases: + return "%s (%s)" % (self._id, ', '.join(self._aliases)) + return self._id + + +class CVRFTracking(object): + STATUSES = ('Draft', 'Interim', 'Final') + def __init__(self, _id, status, version, initial, current): + self._identification = _id + self._status = status + self._version = version + self._history = [] + self._initialDate = initial + self._currentDate = current + self._generator = None + + def addRevision(self, revision): + self._history.append(revision) + + def setGenerator(self, generator): + self._generator = generator + + def getId(self): + return self._identification.getId() + + def validate(self): + if self._identification is None: + raise ValidationError('Document Tracking needs to have an Identification') + self._identification.validate() + if not self._status: + raise ValidationError('Document status must be set') + if self._status not in self.STATUSES: + raise ValidationError('Document Status must be one of %s' % ', '.join(self.STATUSES)) + if not self._version: + raise ValidationError('Document Version must be set') + if len(self._version) > 4: + raise ValidationError('Document Version must be comprised between `nn` and `nn.nn.nn.nn`') + if not self._history: + raise ValidationError('Document must have at least a revision') + if not self._initialDate: + raise ValidationError('Document must have an initial Release date set') + prev_date = self._initialDate + if self._history[0]._date < self._initialDate: + # Documents could have revisions before being released + prev_date = self._history[0]._date + prev = () + for revision in self._history: + revision.validate() + if revision._number <= prev: + raise ValidationError('Revision numbers must always be increasing') + if revision._date < prev_date: + raise ValidationError('Revision dates must always be increasing') + prev = revision._number + prev_date = revision._date + if not self._currentDate: + raise ValidationError('Document must have a Current Release Date set') + if self._currentDate != self._history[-1]._date: + raise ValidationError('Current Release Date must be the same as the Date from the last Revision') + if self._initialDate > self._currentDate: + raise ValidationError('Initial date must not be after current Date') + if self._version != self._history[-1]._number: + raise ValidationError('Document version must be the same as the number of the last Revision') + + def __str__(self): + s = "ID: %s" % self._identification + s += " Status: %s" % self._status + s += " v%s" % '.'.join('%d' % i for i in self._version) + s += " %d revisions" % len(self._history) + s += " Initial release: %s" % self._initialDate.isoformat() + return s + + +class CVRFRevision(object): + def __init__(self, number, date, description): + self._number = number + self._date = date + self._description = description + + def validate(self): + if not self._number: + raise ValidationError('A Revision must have a Number') + if not self._date: + raise ValidationError('A Revision must have a Date') + if not self._description: + raise ValidationError('A Revision must have a Description') + +class CVRFGenerator(object): + def __init__(self): + self._engine = None + self._date = None + + def setEngine(self, engine): + self._engine = engine + + def setDate(self, date): + self._date = date + + def validate(self): + if (not self._engine) and (not self._date): + raise ValidationError('The Generator must have at least an Engine or a Date') + + +class CVRFAggregateSeverity(object): + def __init__(self, severity): + self._severity = severity + self._namespace = None + + def setNamespace(self, namespace): + self._namespace = namespace + + +class CVRF(object): + def __init__(self, title, _type): + self._title = title + self._type = _type + self._publisher = None + self._tracking = None + self._notes = [] + self._distribution = None + self._aggregateseverity = None + self._references = [] + self._acknowledgments = [] + self._producttree = None + self._vulnerabilities = [] + + def setPublisher(self, publisher): + self._publisher = publisher + + def setTracking(self, tracking): + self._tracking = tracking + + def addNote(self, note): + self._notes.append(note) + + def setDistribution(self, distribution): + self._distribution = distribution + + def setAggregateSeverity(self, aggregateseverity): + self._aggregateseverity = aggregateseverity + + def addReference(self, ref): + self._references.append(ref) + + def addAcknowledgment(self, ack): + self._acknowledgments.append(ack) + + def createProductTree(self): + """ only done if the element is there """ + self._producttree = CVRFProductTree() + return self._producttree + + def addVulnerability(self, vuln): + self._vulnerabilities.append(vuln) + + def getProductForID(self, productid): + if self._producttree is None: + raise ValueError('No ProductTree') + return self._producttree.getProductForID(productid) + + def getGroupForID(self, groupid): + if self._producttree is None: + raise ValueError('No ProductTree') + return self._producttree.getGroupForID(groupid) + + def getHighestCVSS(self): + highestBaseScore = 0 + highest = None + for vulnerability in self._vulnerabilities: + for cvss in vulnerability._cvsss: + if cvss._basescore <= highestBaseScore: + continue + highestBaseScore = cvss._basescore + highest = cvss + return highest + + def getProductList(self, type_='Fixed'): + products = set() + if type_ == 'Fixed': + # First try through the Remediation + for vulnerability in self._vulnerabilities: + for remediation in vulnerability._remediations: + if remediation._type != 'Vendor Fix': + continue + for productid in remediation._productids: + products.add(productid) + for groupid in remediation._groupids: + for productid in self.getGroupForID(groupid)._productids: + products.add(productid) + if not products: + # If nothing there, try through the productstatuses + for vulnerability in self._vulnerabilities: + for status in vulnerability._productstatuses: + if status._type != type_: + continue + for productid in status._productids: + products.add(productid) + return set(self.getProductForID(p) for p in products) + + def mentionsProductId(self, productid): + # We first look at the ProductTree + ptree = self._producttree + for relation in ptree._relationships: + if productid == relation._productreference: + yield relation + elif productid == relation._relatestoproductreference: + yield relation + # Then go through the groups + for group in ptree._groups: + if productid in group._productids: + yield group + # Finally, go through all the Vulnerabilities + for vulnerability in self._vulnerabilities: + for item in vulnerability.mentionsProdId(productid): + yield item + + def isProductOrphan(self, productid): + """ Returns if a productid is mentioned nowhere in the document """ + for item in self.mentionsProductId(productid): + return True + return False + + def changeProductID(self, old, new): + for item in self.mentionsProductId(old): + if isinstance(item, CVRFRelationship): + if old == item._productreference: + item._productreference = new + elif old == item._relatestoproductreference: + item._relatestoproductreference = new + else: + item._productids.remove(old) + item._productids.append(new) + + def isGroupOrphan(self, groupid): + """ Returns if a group can be safely deleted """ + for vulnerability in self._vulnerabilities: + if vulnerability.isMentioningGroupId(groupid): + return False + return True + + def isProductTreeOrphan(self): + """ Difference with the previous method is that we don;t care about + inter-producttree references """ + for vulnerability in self._vulnerabilities: + for product in self._producttree._products: + if vulnerability.isMentioningProdId(product._productid): + return False + for group in self._producttree._groups: + if vulnerability.isMentioningGroupId(group._groupid): + return False + return True + + def getNote(self, ordinal): + for note in self._notes: + if note._ordinal == ordinal: + return note + return None + + def getDocId(self): + if self._tracking is not None: + return self._tracking.getId() + # Make up something ... + return self._title.lower() + + def validate(self): + if not self._title: + raise ValidationError('Document Title cannot be empty') + if not self._type: + raise ValidationError('Document Type cannot be empty') + if self._publisher is None: + raise ValidationError('Document Publisher needs to be set') + self._publisher.validate() + if self._tracking is None: + raise ValidationError('Document Tracking needs to be set') + self._tracking.validate() + ordinals = set() + for note in self._notes: + note.validate() + if note._ordinal in ordinals: + raise ValidationError('Document Note ordinal %d is issued twice' % note._ordinal) + ordinals.add(note._ordinal) + for reference in self._references: + reference.validate() + for acknowledgment in self._acknowledgments: + acknowledgment.validate() + productids = set() + groupids = set() + if self._producttree: + productids, groupids = self._producttree.validate() + ordinals = set() + for vulnerability in self._vulnerabilities: + vulnerability.validate(productids, groupids) + if vulnerability._ordinal in ordinals: + raise ValidationError('Vulnerability ordinal %d is issued twice' % vulnerability._ordinal) + ordinals.add(vulnerability._ordinal) + + def __str__(self): + s = [ + 'Title: %s' % self._title, + 'Type: %s' % self._type, + 'Publisher: %s' % self._publisher, + 'tracking: %s' % self._tracking, + '%d Notes: %s' % (len(self._notes), ', '.join( + str(n) for n in self._notes)) + ] + if self._distribution is not None: + s.append('Distribution: %s' % self._distribution) + s.extend([ + '%d Acknowledgments' % len(self._acknowledgments), + 'Products: %s' % self._producttree, + ]) + return '\n'.join(s)