-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
113 lines (94 loc) · 5.99 KB
/
scraper.py
File metadata and controls
113 lines (94 loc) · 5.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import json
import logging
import asyncio
from datetime import datetime
from playwright.async_api import async_playwright
# System Logging Setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class DirectoryScraperPipeline:
def __init__(self):
self.target_venues = [
{"name": "Venue_1_API_Intercept", "url": "https://example-venue-one.com/events"},
{"name": "Venue_2_JS_Heavy", "url": "https://example-venue-two.com/calendar"}
]
self.master_collection = []
async def execute_pipeline(self):
logger.info("Initializing Master Web Scraping Pipeline Engine...")
async with async_playwright() as p:
# Launch highly optimized modern browser engine binary
browser = await p.chromium.launch(headless=True, args=["--disable-gpu"])
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
for venue in self.target_venues:
try:
logger.info(f"Targeting Visual Source: {venue['name']}")
page = await context.new_page()
if venue['name'] == "Venue_1_API_Intercept":
# STRATEGY 1: Network/API Capture (Bypasses UI entirely for raw reliability)
captured_events = []
async def handle_response(response):
if "api/v2/events" in response.url and response.status == 200:
try:
raw_data = await response.json()
# Normalize JSON keys matching frontend schema parameters
for item in raw_data.get("results", []):
captured_events.append({
"title": item.get("eventName"),
"date": item.get("start_date"),
"time": item.get("start_time"),
"price": f"${item.get('cost_cents') / 100}",
"category": "Music" if "concert" in item.get("tags","") else "Art",
"lng": float(item.get("longitude", 0)),
"lat": float(item.get("latitude", 0))
})
logger.info(f"Successfully intercepted {len(captured_events)} clean records via Background API.")
except Exception as json_err:
logger.error(f"Network intercept JSON parsing anomaly: {json_err}")
page.on("response", lambda r: asyncio.ensure_future(handle_response(r)))
await page.goto(venue['url'], wait_until="networkidle", timeout=45000)
self.master_collection.extend(captured_events)
elif venue['name'] == "Venue_2_JS_Heavy":
# STRATEGY 2: Complete DOM Hydration Waiting & XPath Target Extractors
await page.goto(venue['url'], wait_until="domcontentloaded", timeout=45000)
# Command structural delay to handle hydration of custom JS widgets
await page.wait_for_timeout(4000)
# Robust XPath Selectors targeting logical blocks rather than fragile absolute classes
event_blocks = await page.locator("//div[contains(@class, 'event-card') or contains(@id, 'event-')]").all()
logger.info(f"Located {len(event_blocks)} interactive widget blocks on page canvas.")
for block in event_blocks[:5]: # Cap extraction parameters for example footprint
try:
title = await block.locator("//h3 | //span[contains(@class,'title')]").first.inner_text()
date_str = await block.locator("//div[contains(@class,'date')] | //time").first.inner_text()
self.master_collection.append({
"title": title.strip(),
"date": date_str.strip(),
"time": "19:00",
"price": "Check Source Site",
"category": "Other",
"lng": -122.4194, # Normalized Fallbacks
"lat": 37.7749
})
except Exception as block_err:
continue # Maintain operational loop resilience across minor selector updates
await page.close()
except Exception as venue_exception:
logger.error(f"Critical execution error targeting {venue['name']}: {venue_exception}")
# Webhook / Error Notification integration block would run here.
await browser.close()
# Export processed payloads into standard JSON repository file
self.save_to_disk()
def save_to_disk(self):
output_path = os.path.join(os.path.dirname(__file__), 'scraped_events.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.master_collection, f, indent=4, ensure_ascii=False)
logger.info(f"Data Sync Complete. Master Payload saved locally to: {output_path}")
if __name__ == "__main__":
pipeline = DirectoryScraperPipeline()
asyncio.run(pipeline.execute_pipeline())