# -*- coding: utf-8 -*- from odoo import models, fields, api from datetime import datetime, timedelta from collections import defaultdict class ProjectDashboard(models.Model): _name = 'project.dashboard' _description = 'Project Dashboard Data' @api.model def get_dashboard_kpis(self, filters=None, company_id=None): if filters is None: filters = {} user = self.env.user today = fields.Date.today() company = self.env['res.company'].browse(company_id) if company_id else user.company_id task_domain = [('company_id', '=', company.id)] project_domain = [('company_id', '=', company.id)] if filters.get('manager_id'): project_domain.append(('user_id', '=', filters['manager_id'])) task_domain.append(('project_id.user_id', '=', filters['manager_id'])) if filters.get('customer_id'): project_domain.append(('partner_id', '=', filters['customer_id'])) task_domain.append(('project_id.partner_id', '=', filters['customer_id'])) if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id'])) # User filtering if not user.has_group('project.group_project_manager'): task_domain.append(('user_ids', 'in', [user.id])) # ✅ FIXED: Use 'fold' field to find closed stages closed_stages = self.env['project.task.type'].search([('fold', '=', True)]) return { 'total_projects': self.env['project.project'].search_count(project_domain), 'active_tasks': self.env['project.task'].search_count( task_domain + [('stage_id', 'not in', closed_stages.ids)]), 'overdue_tasks': self.env['project.task'].search_count( task_domain + [('date_deadline', '<', today), ('stage_id', 'not in', closed_stages.ids)]), 'today_tasks': self.env['project.task'].search_count(task_domain + [('date_deadline', '=', today)]), 'my_tasks': self.env['project.task'].search_count(task_domain + [('user_ids', 'in', [user.id])]), 'my_overdue_tasks': self.env['project.task'].search_count( task_domain + [('user_ids', 'in', [user.id]), ('date_deadline', '<', today), ('stage_id', 'not in', closed_stages.ids)]), } @api.model def get_task_deadline_chart(self, filters=None, company_id=None): if filters is None: filters = {} today = fields.Date.today() company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id task_domain = [('company_id', '=', company.id)] if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id'])) if not self.env.user.has_group('project.group_project_manager'): task_domain.append( ('user_ids', 'in', [self.env.user.id])) # ✅ FIXED: Use 'fold' field closed_stages = self.env['project.task.type'].search([('fold', '=', True)]) tasks = self.env['project.task'].search(task_domain + [('stage_id', 'not in', closed_stages.ids)]) overdue = today_count = upcoming = 0 for task in tasks: if task.date_deadline: d = fields.Date.to_date(task.date_deadline) if d and d < today: overdue += 1 elif d and d == today: today_count += 1 elif d: upcoming += 1 return [{'name': 'Overdue', 'value': overdue}, {'name': 'Today', 'value': today_count}, {'name': 'Upcoming', 'value': upcoming}] @api.model def get_tasks_by_project(self, filters=None, company_id=None): if filters is None: filters = {} company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id task_domain = [('company_id', '=', company.id)] if filters.get('manager_id'): task_domain.append(('project_id.user_id', '=', filters['manager_id'])) if filters.get('customer_id'): task_domain.append(('project_id.partner_id', '=', filters['customer_id'])) if not self.env.user.has_group('project.group_project_manager'): task_domain.append( ('user_ids', 'in', [self.env.user.id])) tasks = self.env['project.task'].search(task_domain) project_data = defaultdict(int) for task in tasks: project_data[task.project_id.name or 'Unassigned'] += 1 return [{'name': k, 'value': v} for k, v in project_data.items()] @api.model def get_tasks_by_stage(self, filters=None, company_id=None): if filters is None: filters = {} company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id task_domain = [('company_id', '=', company.id)] if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id'])) if not self.env.user.has_group('project.group_project_manager'): task_domain.append( ('user_ids', 'in', [self.env.user.id])) tasks = self.env['project.task'].search(task_domain) stage_data = defaultdict(int) for task in tasks: stage_data[task.stage_id.name or 'No Stage'] += 1 return [{'name': k, 'value': v} for k, v in stage_data.items()] @api.model def get_all_tasks(self, filters=None, limit=10, offset=0, company_id=None): if filters is None: filters = {} company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id task_domain = [('company_id', '=', company.id)] if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id'])) if filters.get('manager_id'): task_domain.append(('project_id.user_id', '=', filters['manager_id'])) if filters.get('customer_id'): task_domain.append(('project_id.partner_id', '=', filters['customer_id'])) if not self.env.user.has_group('project.group_project_manager'): task_domain.append( ('user_ids', 'in', [self.env.user.id])) tasks = self.env['project.task'].search(task_domain, limit=limit, offset=offset, order='date_deadline asc') total = self.env['project.task'].search_count(task_domain) today = fields.Date.today() priority_map = {'0': 'Low', '1': 'Medium', '2': 'High', '3': 'Urgent'} result = [] for task in tasks: days_diff = 999 status = 'upcoming' if task.date_deadline: # ✅ SAFE CONVERSION: Handles both datetime and date types deadline_date = fields.Date.to_date(task.date_deadline) if deadline_date: delta = deadline_date - today days_diff = delta.days if days_diff < 0: status = 'overdue' elif days_diff == 0: status = 'today' result.append({ 'id': task.id, 'name': task.name, 'project': task.project_id.name if task.project_id else '', 'deadline': task.date_deadline.strftime('%Y-%m-%d') if task.date_deadline else '', 'days_diff': days_diff, 'status': status, 'priority': str(task.priority), 'priority_label': priority_map.get(str(task.priority), 'Normal'), }) return {'tasks': result, 'total': total} @api.model def get_timesheet_hours(self, filters=None, company_id=None): if filters is None: filters = {} today = fields.Date.today() first_day = today.replace(day=1) last_day = (first_day + timedelta(days=32)).replace(day=1) - timedelta(days=1) company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id domain = [('date', '>=', first_day), ('date', '<=', last_day), ('company_id', '=', company.id)] if filters.get('project_id'): domain.append(('project_id', '=', filters['project_id'])) timesheets = self.env['account.analytic.line'].search(domain) daily_hours = defaultdict(float) for ts in timesheets: daily_hours[ts.date.strftime('%Y-%m-%d')] += ts.unit_amount result = [] current_day = first_day while current_day <= last_day: date_str = current_day.strftime('%Y-%m-%d') result.append({'date': date_str, 'hours': daily_hours.get(date_str, 0.0)}) current_day += timedelta(days=1) return result @api.model def get_task_deadline_chart(self, filters=None, company_id=None): if filters is None: filters = {} today = fields.Date.today() company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id task_domain = [('company_id', '=', company.id)] if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id'])) if not self.env.user.has_group('project.group_project_manager'): task_domain.append( ('user_ids', 'in', [self.env.user.id])) tasks = self.env['project.task'].search(task_domain) overdue = today_count = upcoming = 0 for task in tasks: if task.date_deadline: # ✅ SAFE CONVERSION d = fields.Date.to_date(task.date_deadline) if d and d < today: overdue += 1 elif d and d == today: today_count += 1 elif d: upcoming += 1 return [{'name': 'Overdue', 'value': overdue}, {'name': 'Today', 'value': today_count}, {'name': 'Upcoming', 'value': upcoming}] @api.model def get_priority_wise_tasks(self, filters=None, company_id=None): if filters is None: filters = {} company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id task_domain = [('company_id', '=', company.id)] if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id'])) if not self.env.user.has_group('project.group_project_manager'): task_domain.append( ('user_ids', 'in', [self.env.user.id])) tasks = self.env['project.task'].search(task_domain) priority_data = defaultdict(int) priority_map = {'0': 'Low', '1': 'Medium', '2': 'High', '3': 'Urgent'} for task in tasks: priority_data[priority_map.get(str(task.priority), 'Normal')] += 1 return [{'name': k, 'value': v} for k, v in priority_data.items()] @api.model def get_companies(self): user = self.env.user companies = user.company_ids if user.has_group('base.group_multi_company') else user.company_id return [{'id': c.id, 'name': c.name} for c in companies]