Mercurial > odfcast > odfcast
view odfcast/convert.py @ 99:349d49bb69f4
Porting on python3
author | Magnus Schieder <mschieder@intevation.de> |
---|---|
date | Tue, 19 Jun 2018 15:07:56 +0200 |
parents | b2f96072b8d7 |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- import logging import tempfile from flask import request, Response, json, render_template from flask.views import MethodView from py3o.template import Template from PyPDF2 import PdfFileMerger from PyPDF2.utils import PyPdfError from werkzeug.utils import escape log = logging.getLogger(__name__) ALLOWED_FORMATS = ["pdf", "doc", "docx", "odt"] PDF_MIMETYPE = "application/pdf" JSON_MIMETYPE = "application/json" HTML_MIMETYPE = "text/html" MIMETYPES = { "odt": "application/vnd.oasis.opendocument.text", "doc": "application/msword", "docx": "application/vnd.openxmlformats-officedocument" ".wordprocessingml.document", "pdf": PDF_MIMETYPE, } DEFAULT_MIMETYPE = "application/octet-stream" class ErrorResponse(Response): BAD_REQUEST_ERROR_CODE = 400 def __init__(self, title, error_code, details, http_error_code=BAD_REQUEST_ERROR_CODE): data, mime_type = self.get_response_data(title, error_code, details) super(ErrorResponse, self).__init__(response=data, mimetype=mime_type, status=http_error_code) def json(self, title, error_code, details): return json.dumps({ "error": title, "error_code": error_code, "details": details, }), JSON_MIMETYPE def html(self, title, error_code, details): data = ( '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n' '<title>%(code)s %(name)s</title>\n' '<h1>%(name)s</h1>\n' '%(details)s\n' ) % { "code": error_code, "name": escape(title), "details": escape(details), } return data, HTML_MIMETYPE def get_response_data(self, title, error_code, details): if self.is_wants_json(): return self.json(title, error_code, details) return self.html(title, error_code, details) def is_wants_json(self): best = request.accept_mimetypes.best_match([JSON_MIMETYPE, HTML_MIMETYPE]) return best == JSON_MIMETYPE and \ request.accept_mimetypes[best] > \ request.accept_mimetypes[HTML_MIMETYPE] class TemplateErrorResponse(ErrorResponse): TEMPLATE_ERROR_CODE = 100 def __init__(self, details, error_code=TEMPLATE_ERROR_CODE): super(TemplateErrorResponse, self).__init__( title="TemplateError", error_code=error_code, details=details, http_error_code=500) class ConversionErrorResponse(ErrorResponse): CONVERSION_ERROR_CODE = 200 def __init__(self, details, error_code=CONVERSION_ERROR_CODE): super(ConversionErrorResponse, self).__init__( title="ConversionError", error_code=error_code, details=details, http_error_code=500) class MergeErrorResponse(ErrorResponse): MERGE_ERROR_CODE = 300 def __init__(self, details, error_code=MERGE_ERROR_CODE, http_error_code=500): super(MergeErrorResponse, self).__init__( title="MergeError", error_code=error_code, details=details, http_error_code=http_error_code) class ConvertView(MethodView): def __init__(self, pyuno_driver_name="", hostname="localhost", port=2001): driver_module = self._load_driver_module(pyuno_driver_name) self.convertor = driver_module.Convertor(hostname, port) def _load_driver_module(self, pyuno_driver_name): return __import__(pyuno_driver_name, globals(), locals(), ["Convertor"]) def is_format_supported(self, fformat): return fformat and fformat.lower() in ALLOWED_FORMATS def post(self): log.debug("Converting document") ffile = request.files['file'] if not ffile.filename: return ErrorResponse( "Upload file missing", error_code=101, details="Please upload a file for conversion", http_error_code=400) fformat = request.form['format'] if not self.is_format_supported(fformat): return ErrorResponse( "Invalid format", error_code=102, details="Format %s not allowed" % fformat, http_error_code=400) datadict = self.get_datadict() if datadict: log.debug(" with datadict") mimetype = self.get_mimetype_for_format(fformat) log.debug(" to %s" % fformat) outfile = self.save_form_file(ffile) if datadict: try: tfile = tempfile.NamedTemporaryFile() t = Template(outfile, tfile, ignore_undefined_variables=True) t.render(datadict) outfile.close() outfile = tfile outfile.seek(0) except Exception as e: log.exception("Template error") return TemplateErrorResponse(details=str(e)) if fformat != "odt": try: outfile = self.convert(outfile, fformat) except Exception as e: log.exception("Conversion error") return ConversionErrorResponse(details=str(e)) log.debug("Document converted") return Response(outfile, mimetype=mimetype) def get(self): return render_template("convert.html") def save_form_file(self, infile): outfile = tempfile.NamedTemporaryFile() infile.save(outfile) infile.close() outfile.seek(0) return outfile def convert(self, infile, fformat): outfile = tempfile.NamedTemporaryFile() self.convertor.convert(infile.name, outfile.name, fformat) infile.close() return outfile def get_mimetype_for_format(self, fformat): return MIMETYPES.get(fformat, DEFAULT_MIMETYPE) def get_datadict(self): vars = request.form.get('datadict') if not vars: return None return json.loads(vars) class MergeView(MethodView): def get(self): return render_template("merge.html") def post(self): log.debug("Merging PDF documents") merger = PdfFileMerger(strict=False) ffiles = [] # allow files to have arbitray form names # order files by their form names for key, value in sorted(request.files.lists(), key=lambda x: x[0].lower()): ffiles.extend(value) for ffile in ffiles: try: merger.append(ffile, import_bookmarks=False) except Exception as e: log.exception("Error merging file %s" % ffile) if self.is_ignore_file_errors(): continue else: return MergeErrorResponse(details=str(e)) outfile = tempfile.NamedTemporaryFile() try: merger.write(outfile) merger.close() outfile.seek(0) except PyPdfError as e: log.exception("Merge error") return MergeErrorResponse(details=str(e)) log.debug("PDF documents merged") return Response(outfile, mimetype=PDF_MIMETYPE) def is_ignore_file_errors(self): return request.args.get("ignore_file_errors", False) or \ request.form.get("ignore_file_errors", False) class CheckView(MethodView): def get(self): return render_template("check.html") def post(self): """Check that the attached PDF file is ready for merging. If it is not ready a MergeErrorResponse is returned with http_error_code=422. The default error code of 500 is not really sensible because it is not an internal server error if the attachment cannot be merged. The code 422 is used in WEB-DAV with the meaning "Unprocessable Entity" which fits relatively well. """ log.debug("Checking a PDF document's readiness for merging") ffile = request.files['file'] if not ffile.filename: return ErrorResponse( "Upload file missing", error_code=101, details="Please upload a file for conversion", http_error_code=400) with tempfile.TemporaryFile() as outfile: merger = PdfFileMerger(strict=False) try: merger.append(ffile, import_bookmarks=False) except Exception as e: log.exception("Error testing merger.append of %s" % ffile) return MergeErrorResponse(details=str(e), http_error_code=422) try: merger.write(outfile) except Exception as e: log.exception("Error testing merger.write of merged %s" % ffile) return MergeErrorResponse(details=str(e), http_error_code=422) merger.close() log.debug("PDF document %s checked." % ffile) return Response("Okay.") class TemplateView(MethodView): template_name = "" def __init__(self, template_name=None): if template_name: self.template_name = template_name def get_template_name(self): return self.template_name def get(self): return render_template(self.get_template_name())