# -*- coding: utf-8 -*- # Copyright (c) 2020, Studio Infinity and contributors # For license information, please see license.txt from __future__ import unicode_literals import frappe from frappe.model.document import Document from frappe.utils.csvutils import read_csv_content from frappe.utils.dateutils import parse_date from fnmatch import fnmatch as wildcard_match from operator import itemgetter from os.path import commonprefix import re class JWImport(Document): account_mapping_fields = 'charge_type charge_subtype map_to_account'.split() def process_invoice_file(self): fcontent = frappe.get_doc('File', dict(file_url=self.load_from_file)).get_content() rows = read_csv_content(fcontent) header = [frappe.scrub(h) for h in rows.pop(0)] self.set('lines', []) # Load the base data from the file for row in rows: rec = self.append('lines', {}) for fieldname, val in zip(header, row): if val: if fieldname[-4:] == 'date': val = parse_date(val) elif fieldname == 'amount': val = re.sub('^[^.,\d]*', '', val) rec.set(fieldname, val) # Now determine all of the derived data self.map_accounts(self.lines) self.determine_references(self.lines) #self.check_data(self.lines) #self.determine_disposition(self.lines) def map_accounts(self, recs): amf = self.account_mapping_fields acc_maps = frappe.get_all('JW Account Mapping', fields=amf) for rec in recs: for amap in acc_maps: if all(wildcard_match((rec.get(fld,'') or ''), amap.get(fld)) for fld in amf[:-1]): rec.set('account', amap.get(amf[-1])) break def determine_references(self, recs): expense_acct = frappe.get_value('Company', self.justworks_company, 'default_expense_claim_payable_account') # First determine the reference type for each record for rec in recs: acct = rec.get('account', '') if not acct: continue if acct == expense_acct: rec.set('reference_doctype', 'Expense Claim') continue if frappe.get_value('Account', acct, 'account_type') == 'Payable': rec.set('reference_doctype', 'Purchase Invoice') continue if frappe.get_value('Account', acct, 'root_type') == 'Expense': rec.set('reference_doctype', 'Employee') # Now try to determine the matching doc for each record for rec in recs: doctype = rec.get('reference_doctype', '') if not doctype or not rec.get('reference', ''): continue if doctype == 'Purchase Invoice': self.search_invoice(rec) continue emp = self.search_employee(rec) if not emp: continue if doctype == 'Employee': rec.set('reference_doc', emp) continue # Ok, this is an expense claim: self.search_expense_claim(rec, emp) # Look for and return an employee matching the reference of the given # record, if any def search_employee(self, rec): fullref = rec.reference (last, comma, restref) = fullref.partition(',') emps = (frappe.get_all('Employee', filters={'justworks_name': fullref}) or frappe.get_all('Employee', filters={'last_name': last}, fields=['name', 'first_name'])) if not emps: return '' if len(emps) == 1: return emps[0].name # We have multiple employees with same last name, take # best match for first name first = restref.partition(' ')[0] closest = max(emps, key=lambda e: len(commonprefix(e, first))) return closest.name # Look for a purchase invoice with the same supplier and amount # as rec and fill in the reference_doc if found def search_invoice(self, rec): # Reference should be a supplier suppliers = (frappe.get_all('Supplier', {'supplier_name': rec.reference}) or frappe.get_all('Supplier', { 'supplier_name': ['like', f"%{rec.reference}%"]}) ) if not suppliers or len(suppliers) != 1: return suplr = suppliers[0].name invoices = frappe.get_all('Purchase Invoice', filters={'supplier': suplr, 'company': self.justworks_company}, fields=['name', 'posting_date as date', 'grand_total', 'rounded_total', 'outstanding_amount']) if not invoices: return if len(invoices) == 1: rec.set('reference_doc', invoices[0].name) return match_inv = ([inv for inv in invoices if inv.outstanding_amount == rec.amount] or [inv for inv in invoices if inv.outstanding_amount > rec.amount]) if len(match_inv) > 0: # Best match is earliest unpaid invoice earliest = min(match_inv, key=itemgetter('date')) rec.set('reference_doc', earliest.name) return match_inv = [inv for inv in invoices if inv.grand_total == rec.amount or inv.rounded_total == rec.amount] if len(match_inv) > 0: latest = max(match_inv, key=itemgetter('date')) rec.set('reference_doc', latest.name) # Hmmm, supplier has invoices but none with # enough left to be paid and none with matching # amount, so punt. # Look for an expense claim with given emp and the same amount as # the given rec, and if one is found, fill in the reference_doc def search_expense_claim(self, rec, emp): claims = frappe.get_all('Expense Claim', filters={'employee':emp, 'company': self.justworks_company}, fields=['name', 'total_amount_reimbursed as total', 'is_paid', 'posting_date as date']) if not claims: return if len(claims) == 1: rec.set('reference_doc', claims[0].name) return match_claims = {c for c in claims if c.total == rec.amount} unpaid_claims = {c for c in claims if c.total >= rec.amount and not c.is_paid} mu_claims = match_claims & unpaid_claims if len(mu_claims) > 0: earliest = min(mu_claims, key=itemgetter('date')) rec.set('reference_doc', earliest.name) return if len(match_claims) > 0: latest = max(match_claims, key=itemgetter('date')) rec.set('reference_doc', latest.name) return if len(unpaid_claims) > 0: earliest = min(unpaid_claims, key=itemgetter('date')) rec.set('reference_doc', earliest.name) return # Hmmm, employee has claims but no large enough unpaid ones # and none with matching amount, so punt.