330 lines
12 KiB
Python
330 lines
12 KiB
Python
|
|
import base64
|
||
|
|
import io
|
||
|
|
import logging
|
||
|
|
import secrets
|
||
|
|
from datetime import timedelta
|
||
|
|
|
||
|
|
from odoo import api, fields, models
|
||
|
|
from odoo.http import request
|
||
|
|
|
||
|
|
_logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class DocApprovalDocumentPackageCustom(models.Model):
|
||
|
|
_inherit = 'xf.doc.approval.document.package'
|
||
|
|
|
||
|
|
document_type = fields.Selection(
|
||
|
|
string='Document Type',
|
||
|
|
selection=[
|
||
|
|
('request', 'សំណើ'),
|
||
|
|
('report', 'របាយការណ៍'),
|
||
|
|
('letter', 'លិខិត'),
|
||
|
|
('other', 'ផ្សេងៗ'),
|
||
|
|
],
|
||
|
|
)
|
||
|
|
decision_requester_id = fields.Many2one(
|
||
|
|
string='Request Decision From',
|
||
|
|
comodel_name='res.users',
|
||
|
|
)
|
||
|
|
reference_note = fields.Text(
|
||
|
|
string='Reference Note',
|
||
|
|
)
|
||
|
|
|
||
|
|
state = fields.Selection(
|
||
|
|
selection=[
|
||
|
|
('draft', 'បង្កើតលិខិតឯកសារ'),
|
||
|
|
('approval', 'ការពិនិត្យនិងផ្ដល់យោបល់'),
|
||
|
|
('approved', 'ការអនុម័ត'),
|
||
|
|
('cancelled', 'បានលុបចោល'),
|
||
|
|
('rejected', 'បានបដិសេធ'),
|
||
|
|
],
|
||
|
|
)
|
||
|
|
|
||
|
|
document_source = fields.Selection(
|
||
|
|
string='Document Source',
|
||
|
|
selection=[
|
||
|
|
('create', 'បង្កើតលំហូរឯកសារ'),
|
||
|
|
('template', 'ជ្រើសរើសគំរូលំហូរឯកសារ'),
|
||
|
|
],
|
||
|
|
default='create',
|
||
|
|
)
|
||
|
|
doc_number = fields.Char(
|
||
|
|
string='Document Number',
|
||
|
|
copy=False,
|
||
|
|
)
|
||
|
|
upload_file_ids = fields.Many2many(
|
||
|
|
'ir.attachment',
|
||
|
|
'xf_doc_package_upload_rel',
|
||
|
|
'package_id',
|
||
|
|
'attachment_id',
|
||
|
|
string='ឯកសារ',
|
||
|
|
)
|
||
|
|
reference_file_ids = fields.Many2many(
|
||
|
|
'ir.attachment',
|
||
|
|
'xf_doc_package_ref_att_rel',
|
||
|
|
'package_id',
|
||
|
|
'attachment_id',
|
||
|
|
string='ឯកសារយោង',
|
||
|
|
)
|
||
|
|
description = fields.Html(
|
||
|
|
string='Description',
|
||
|
|
translate=True,
|
||
|
|
sanitize=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
qr_code = fields.Image(
|
||
|
|
string='QR Code',
|
||
|
|
max_width=0,
|
||
|
|
max_height=0,
|
||
|
|
)
|
||
|
|
|
||
|
|
# New fields for secure QR access
|
||
|
|
qr_access_token = fields.Char(
|
||
|
|
string='QR Access Token',
|
||
|
|
copy=False,
|
||
|
|
readonly=True,
|
||
|
|
help='Secure token for QR code access'
|
||
|
|
)
|
||
|
|
qr_token_expiry = fields.Datetime(
|
||
|
|
string='QR Token Expiry',
|
||
|
|
copy=False,
|
||
|
|
readonly=True,
|
||
|
|
help='Token expiration date (optional, leave empty for no expiry)'
|
||
|
|
)
|
||
|
|
authorized_user_ids = fields.Many2many(
|
||
|
|
'res.users',
|
||
|
|
'xf_doc_package_authorized_users_rel',
|
||
|
|
'package_id',
|
||
|
|
'user_id',
|
||
|
|
string='Authorized Users',
|
||
|
|
help='Users who can access this document via QR code'
|
||
|
|
)
|
||
|
|
|
||
|
|
document_files_html = fields.Html(compute='_compute_document_files_html', sanitize=False)
|
||
|
|
reference_files_html = fields.Html(compute='_compute_reference_files_html', sanitize=False)
|
||
|
|
|
||
|
|
@api.depends('document_ids.file_name', 'document_ids.is_reference')
|
||
|
|
def _compute_document_files_html(self):
|
||
|
|
for rec in self:
|
||
|
|
docs = rec.document_ids.filtered(lambda d: not d.is_reference)
|
||
|
|
rec.document_files_html = self._build_file_html(docs)
|
||
|
|
|
||
|
|
@api.depends('document_ids.file_name', 'document_ids.is_reference', 'reference_file_ids')
|
||
|
|
def _compute_reference_files_html(self):
|
||
|
|
for rec in self:
|
||
|
|
# Use document_ids (is_reference=True) — works for all users
|
||
|
|
docs = rec.document_ids.filtered(lambda d: d.is_reference)
|
||
|
|
if docs:
|
||
|
|
rec.reference_files_html = self._build_file_html(docs)
|
||
|
|
else:
|
||
|
|
# Fallback for old documents: read reference_file_ids via sudo
|
||
|
|
atts = rec.sudo().reference_file_ids
|
||
|
|
if not atts:
|
||
|
|
rec.reference_files_html = False
|
||
|
|
continue
|
||
|
|
rows = ''.join(
|
||
|
|
'<div class="o_xf_file_item">'
|
||
|
|
'<i class="fa fa-file-o o_xf_file_icon"></i>'
|
||
|
|
f'<a href="/web/content/{att.id}?download=true" target="_blank">'
|
||
|
|
f'{att.name}'
|
||
|
|
'</a>'
|
||
|
|
'</div>'
|
||
|
|
for att in atts
|
||
|
|
)
|
||
|
|
rec.reference_files_html = f'<div class="o_xf_files_list">{rows}</div>'
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _build_file_html(docs):
|
||
|
|
if not docs:
|
||
|
|
return False
|
||
|
|
rows = ''.join(
|
||
|
|
'<div class="o_xf_file_item">'
|
||
|
|
'<i class="fa fa-file-o o_xf_file_icon"></i>'
|
||
|
|
f'<a href="/web/content/xf.doc.approval.document/{d.id}/file/{d.file_name or d.name or ""}" target="_blank">'
|
||
|
|
f'{d.file_name or d.name}'
|
||
|
|
'</a>'
|
||
|
|
'</div>'
|
||
|
|
for d in docs
|
||
|
|
)
|
||
|
|
return f'<div class="o_xf_files_list">{rows}</div>'
|
||
|
|
|
||
|
|
@api.onchange('upload_file_ids')
|
||
|
|
def _onchange_document_ids_name(self):
|
||
|
|
if self.upload_file_ids and not self.doc_number:
|
||
|
|
first = self.upload_file_ids[0]
|
||
|
|
name = first.name or ''
|
||
|
|
# Strip extension for a cleaner document number
|
||
|
|
if '.' in name:
|
||
|
|
name = name.rsplit('.', 1)[0]
|
||
|
|
self.doc_number = name
|
||
|
|
|
||
|
|
def _generate_access_token(self):
|
||
|
|
"""Generate a secure random access token."""
|
||
|
|
return secrets.token_urlsafe(32)
|
||
|
|
|
||
|
|
def _generate_qr_code(self):
|
||
|
|
"""Generate QR code with secure access token."""
|
||
|
|
import qrcode
|
||
|
|
for record in self:
|
||
|
|
if record.state == 'approved' and record.doc_number:
|
||
|
|
try:
|
||
|
|
# Generate or reuse access token
|
||
|
|
if not record.qr_access_token:
|
||
|
|
record.qr_access_token = record._generate_access_token()
|
||
|
|
|
||
|
|
# Build secure URL with token
|
||
|
|
base_url = self.env['ir.config_parameter'].sudo().get_param(
|
||
|
|
'web.base.url', 'http://localhost:8069'
|
||
|
|
)
|
||
|
|
|
||
|
|
# URL format: /document/access/<package_id>/<token>
|
||
|
|
content = f"{base_url}/document/access/{record.id}/{record.qr_access_token}"
|
||
|
|
|
||
|
|
qr = qrcode.QRCode(
|
||
|
|
version=None,
|
||
|
|
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
||
|
|
box_size=15,
|
||
|
|
border=4,
|
||
|
|
)
|
||
|
|
qr.add_data(content)
|
||
|
|
qr.make(fit=True)
|
||
|
|
img = qr.make_image(fill_color='black', back_color='white')
|
||
|
|
buffer = io.BytesIO()
|
||
|
|
img.save(buffer, format='PNG')
|
||
|
|
record.write({'qr_code': base64.b64encode(buffer.getvalue())})
|
||
|
|
_logger.info(
|
||
|
|
'Secure QR code generated for record %s (package_id: %s)',
|
||
|
|
record.id, record.id
|
||
|
|
)
|
||
|
|
except Exception:
|
||
|
|
_logger.exception('QR code generation failed for record %s', record.id)
|
||
|
|
record.write({'qr_code': False})
|
||
|
|
else:
|
||
|
|
record.write({'qr_code': False})
|
||
|
|
|
||
|
|
def check_qr_access(self, token, user=None):
|
||
|
|
"""
|
||
|
|
Check if a user is authorized to access this document via QR code.
|
||
|
|
|
||
|
|
:param token: The access token from QR code
|
||
|
|
:param user: User record (defaults to current user)
|
||
|
|
:return: Boolean indicating access permission
|
||
|
|
"""
|
||
|
|
self.ensure_one()
|
||
|
|
|
||
|
|
if not user:
|
||
|
|
user = self.env.user
|
||
|
|
|
||
|
|
# Validate token
|
||
|
|
if not token or token != self.qr_access_token:
|
||
|
|
_logger.warning(
|
||
|
|
'Invalid QR token attempt for package %s by user %s',
|
||
|
|
self.id, user.id
|
||
|
|
)
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Check token expiry if set
|
||
|
|
if self.qr_token_expiry:
|
||
|
|
if fields.Datetime.now() > self.qr_token_expiry:
|
||
|
|
_logger.warning(
|
||
|
|
'Expired QR token for package %s (expired: %s)',
|
||
|
|
self.id, self.qr_token_expiry
|
||
|
|
)
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Admin always has access
|
||
|
|
if user.has_group('base.group_system'):
|
||
|
|
return True
|
||
|
|
|
||
|
|
# Check if user is in authorized list
|
||
|
|
if self.authorized_user_ids and user not in self.authorized_user_ids:
|
||
|
|
_logger.warning(
|
||
|
|
'Unauthorized QR access attempt for package %s by user %s (%s)',
|
||
|
|
self.id, user.id, user.name
|
||
|
|
)
|
||
|
|
return False
|
||
|
|
|
||
|
|
# If authorized_user_ids is empty, consider document as public
|
||
|
|
# Change this logic if you want to restrict by default
|
||
|
|
if not self.authorized_user_ids:
|
||
|
|
_logger.info(
|
||
|
|
'Public document access via QR for package %s by user %s',
|
||
|
|
self.id, user.id
|
||
|
|
)
|
||
|
|
# Optionally restrict: return False
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
def action_finish_approval(self):
|
||
|
|
res = super().action_finish_approval()
|
||
|
|
approved = self.filtered(lambda r: r.state == 'approved')
|
||
|
|
if approved:
|
||
|
|
approved._generate_qr_code()
|
||
|
|
return res
|
||
|
|
|
||
|
|
def action_regenerate_qr_token(self):
|
||
|
|
"""Regenerate QR access token (useful if token is compromised)."""
|
||
|
|
for record in self:
|
||
|
|
record.qr_access_token = record._generate_access_token()
|
||
|
|
record._generate_qr_code()
|
||
|
|
return {
|
||
|
|
'type': 'ir.actions.client',
|
||
|
|
'tag': 'display_notification',
|
||
|
|
'params': {
|
||
|
|
'title': 'Success',
|
||
|
|
'message': 'QR access token regenerated successfully',
|
||
|
|
'type': 'success',
|
||
|
|
'sticky': False,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
def action_send_for_approval(self):
|
||
|
|
for record in self:
|
||
|
|
# Auto document number
|
||
|
|
if not record.doc_number:
|
||
|
|
record.doc_number = (
|
||
|
|
self.env['ir.sequence'].next_by_code('xf.doc.approval.document.package') or ''
|
||
|
|
)
|
||
|
|
# Sync upload_file_ids → document_ids (is_reference=False)
|
||
|
|
existing_names = set(
|
||
|
|
record.document_ids.filtered(lambda d: not d.is_reference).mapped('file_name')
|
||
|
|
)
|
||
|
|
new_docs = []
|
||
|
|
for att in record.upload_file_ids:
|
||
|
|
if att.name not in existing_names:
|
||
|
|
new_docs.append((0, 0, {
|
||
|
|
'name': att.name,
|
||
|
|
'file': att.datas,
|
||
|
|
'file_name': att.name,
|
||
|
|
'is_reference': False,
|
||
|
|
}))
|
||
|
|
# Sync reference_file_ids → document_ids (is_reference=True)
|
||
|
|
existing_ref_names = set(
|
||
|
|
record.document_ids.filtered(lambda d: d.is_reference).mapped('file_name')
|
||
|
|
)
|
||
|
|
for att in record.reference_file_ids:
|
||
|
|
if att.name not in existing_ref_names:
|
||
|
|
new_docs.append((0, 0, {
|
||
|
|
'name': att.name,
|
||
|
|
'file': att.datas,
|
||
|
|
'file_name': att.name,
|
||
|
|
'is_reference': True,
|
||
|
|
}))
|
||
|
|
if new_docs:
|
||
|
|
record.sudo().write({'document_ids': new_docs})
|
||
|
|
# Ensure reference attachments are accessible to all users
|
||
|
|
if record.reference_file_ids:
|
||
|
|
record.reference_file_ids.sudo().write({
|
||
|
|
'res_model': self._name,
|
||
|
|
'res_id': record.id,
|
||
|
|
})
|
||
|
|
return super().action_send_for_approval()
|
||
|
|
|
||
|
|
def action_draft(self):
|
||
|
|
res = super().action_draft()
|
||
|
|
# Bump round for all approvers on re-submission
|
||
|
|
for record in self:
|
||
|
|
if record.approver_ids:
|
||
|
|
current_round = max(record.approver_ids.mapped('round') or [1])
|
||
|
|
record.approver_ids.write({'round': current_round + 1, 'sent_date': False})
|
||
|
|
return res
|