import csv import sqlite3 from pathlib import Path from datetime import datetime ROOT = Path(__file__).resolve().parents[1] DB_PATH = ROOT / "webscraper-main" / "webscraper_validation.db" OUTREACH_PATH = ROOT / "data" / "offpage" / "outreach_database.csv" LOG_PATH = ROOT / "data" / "offpage" / "ingestion_log.csv" FIELDS = [ "tier", "site", "contact", "method", "acceptance_rate", "status", "notes", ] def load_existing_rows(path: Path): if not path.exists(): return [] with path.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) return list(reader) def write_rows(path: Path, rows): path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=FIELDS) writer.writeheader() writer.writerows(rows) def append_log(total_added: int, total_scanned: int, source: str): header = ["timestamp", "source", "rows_scanned", "rows_added"] exists = LOG_PATH.exists() with LOG_PATH.open("a", encoding="utf-8", newline="") as f: writer = csv.writer(f) if not exists: writer.writerow(header) writer.writerow([datetime.utcnow().isoformat(), source, total_scanned, total_added]) def normalize_domain(url: str) -> str: value = (url or "").strip().lower() value = value.replace("https://", "").replace("http://", "") if value.startswith("www."): value = value[4:] return value.split("/")[0] def extract_candidates_from_db(path: Path): if not path.exists(): return [] conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row cur = conn.cursor() # Dynamic table discovery so this script works with evolving schema. cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [r[0] for r in cur.fetchall()] candidates = [] for table in tables: cur.execute(f"PRAGMA table_info({table})") cols = [r[1].lower() for r in cur.fetchall()] has_url = "url" in cols or "website" in cols or "domain" in cols has_contact = "email" in cols or "contact_email" in cols or "phone" in cols if not (has_url and has_contact): continue query_cols = [] if "url" in cols: query_cols.append("url") elif "website" in cols: query_cols.append("website AS url") elif "domain" in cols: query_cols.append("domain AS url") if "email" in cols: query_cols.append("email") elif "contact_email" in cols: query_cols.append("contact_email AS email") else: query_cols.append("'' AS email") if "phone" in cols: query_cols.append("phone") else: query_cols.append("'' AS phone") if "name" in cols: query_cols.append("name") elif "business_name" in cols: query_cols.append("business_name AS name") else: query_cols.append("'' AS name") query = f"SELECT {', '.join(query_cols)} FROM {table} LIMIT 500" try: cur.execute(query) for row in cur.fetchall(): candidates.append( { "url": row["url"] if "url" in row.keys() else "", "email": row["email"] if "email" in row.keys() else "", "phone": row["phone"] if "phone" in row.keys() else "", "name": row["name"] if "name" in row.keys() else "", "source": table, } ) except sqlite3.Error: continue conn.close() return candidates def main(): existing = load_existing_rows(OUTREACH_PATH) existing_keys = { (row.get("site", "").strip().lower(), row.get("contact", "").strip().lower()) for row in existing } candidates = extract_candidates_from_db(DB_PATH) added = 0 for c in candidates: domain = normalize_domain(c.get("url", "")) email = (c.get("email", "") or "").strip().lower() phone = (c.get("phone", "") or "").strip() contact = email if email else phone if not domain or not contact: continue key = (domain, contact) if key in existing_keys: continue existing_keys.add(key) existing.append( { "tier": "Pipeline", "site": domain, "contact": contact, "method": "Webscraper ingest", "acceptance_rate": "TBD", "status": "Queued", "notes": f"source={c.get('source', 'unknown')}|name={c.get('name', '')[:60]}", } ) added += 1 write_rows(OUTREACH_PATH, existing) append_log(added, len(candidates), "webscraper_validation.db") print(f"scanned={len(candidates)} added={added} total={len(existing)}") if __name__ == "__main__": main()