import requests import logging import sys import os import json from datetime import datetime import argparse import urllib.parse from dotenv import load_dotenv # Load environment variables from .env file load_dotenv("753DataSync.env") # Configuration BASE_URL = "{}/{}/{}" log_level = os.getenv('LOG_LEVEL', 'INFO').upper() # Setup logging logger = logging.getLogger() # Set the log level for the logger if log_level == 'DEBUG': logger.setLevel(logging.DEBUG) elif log_level == 'INFO': logger.setLevel(logging.INFO) elif log_level == 'WARNING': logger.setLevel(logging.WARNING) elif log_level == 'ERROR': logger.setLevel(logging.ERROR) elif log_level == 'CRITICAL': logger.setLevel(logging.CRITICAL) else: logger.setLevel(logging.INFO) # File handler file_handler = logging.FileHandler('753DataSync.log') file_handler.setLevel(getattr(logging, log_level)) # Stream handler (console output) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(getattr(logging, log_level)) # Log format formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) stream_handler.setFormatter(formatter) # Add handlers to the logger logger.addHandler(file_handler) logger.addHandler(stream_handler) def fetch_data(api_url, page_number, results_per_page): """Fetches data from the API and returns the response.""" url = BASE_URL.format(api_url, page_number, results_per_page) try: logger.info(f"Making request to: {url} with page_number={page_number} and results_per_page={results_per_page}") response = requests.get(url) # Check for HTTP errors response.raise_for_status() # Success log logger.info(f"Successfully fetched data from {url}. Status code: {response.status_code}.") # Debug log with additional response details logger.debug(f"GET request to {url} completed with status code {response.status_code}. " f"Response time: {response.elapsed.total_seconds()} seconds.") # Return JSON data return response.json() except requests.exceptions.HTTPError as http_err: logger.error(f"HTTP error occurred while fetching data from {url}: {http_err}") sys.exit(1) except requests.exceptions.RequestException as req_err: logger.error(f"Request error occurred while fetching data from {url}: {req_err}") sys.exit(1) except Exception as err: logger.exception(f"An unexpected error occurred while fetching data from {url}: {err}") sys.exit(1) def save_json(data, filename): """Saves JSON data to a file.""" try: # Ensure directory exists if not os.path.exists('data'): os.makedirs('data') logger.info(f"Directory 'data' created.") # Save data to file with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) logger.info(f"Data successfully saved to {filename}") except OSError as e: logger.error(f"OS error occurred while saving JSON data to {filename}: {e}") sys.exit(1) except IOError as e: logger.error(f"I/O error occurred while saving JSON data to {filename}: {e}") sys.exit(1) except Exception as e: logger.error(f"Unexpected error occurred while saving JSON data to {filename}: {e}") sys.exit(1) def parse_arguments(): """Parse command-line arguments.""" parser = argparse.ArgumentParser(description="Fetch enforcement data from the NYSDPS API.") # Add arguments for results per page parser.add_argument('--results_per_page', type=int, default=100, help="Number of results per page (default: 100)") # Parse the arguments args = parser.parse_args() return args.results_per_page def generate_token(username, password, url="https://www.arcgis.com/sharing/rest/generateToken"): """Generates an authentication token.""" payload = { 'f': 'json', 'username': username, 'password': password, 'client': 'referer', 'referer': 'https://www.arcgis.com', 'expiration': '120' } headers = {} try: logger.info(f"Generating token for username '{username}' using URL: {url}") response = requests.post(url, headers=headers, data=payload) # Log the request status and response time logger.debug(f"POST request to {url} completed with status code {response.status_code}. " f"Response time: {response.elapsed.total_seconds()} seconds.") response.raise_for_status() # Raise an error for bad status codes # Extract token from the response token = response.json().get('token') if token: logger.info("Token generated successfully.") else: logger.error("Token not found in the response.") sys.exit(1) return token except requests.exceptions.RequestException as e: logger.error(f"Error generating token for username '{username}': {e}") sys.exit(1) except KeyError as e: logger.error(f"Error extracting token from the response: Missing key {e}") sys.exit(1) except Exception as e: logger.exception(f"Unexpected error generating token for username '{username}': {e}") sys.exit(1) def truncate(token, hostname, instance, fs, layer, secure=True): """Truncate the specified layer in the feature service.""" protocol = 'https://' if secure else 'http://' url = f"{protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}/truncate?token={token}&async=true&f=json" try: logging.info(f"Attempting to truncate layer {layer} on {hostname}...") # Debug logging for the URL being used logging.debug(f"Truncate URL: {url}") response = requests.post(url, timeout=30) # Log response time logging.debug(f"POST request to {url} completed with status code {response.status_code}. " f"Response time: {response.elapsed.total_seconds()} seconds.") # Check for HTTP errors response.raise_for_status() # Raise an exception for HTTP errors (4xx, 5xx) # Check for any known error in the response content (e.g., ArcGIS error codes) if response.status_code == 200: result = response.json() if 'error' in result: logging.error(f"Error truncating layer {layer}: {result['error']}") return None logging.info(f"Successfully truncated layer: {protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}.") return result else: logging.error(f"Unexpected response for layer {layer}: {response.status_code} - {response.text}") return None except requests.exceptions.Timeout as e: logging.error(f"Request timed out while truncating layer {layer}: {e}") return None except requests.exceptions.RequestException as e: logging.error(f"Request failed while truncating layer {layer}: {e}") return None except Exception as e: logging.error(f"An unexpected error occurred while truncating layer {layer}: {e}") return None def add_features(token, hostname, instance, fs, layer, aggregated_data, secure=True): """Add features to a feature service.""" protocol = 'https://' if secure else 'http://' url = f"{protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}/addFeatures?token={token}&rollbackOnFailure=true&f=json" logger.info(f"Attempting to add features to {protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}...") # Prepare features data as the payload features_json = json.dumps(aggregated_data) # Convert aggregated data to JSON string features_encoded = urllib.parse.quote(features_json) # URL-encode the JSON string # Construct the URL-encoded payload with 'features=' and the URL-encoded data payload = f"features={features_encoded}" headers = { 'Content-Type': 'application/x-www-form-urlencoded' } try: # Log request details (but avoid logging sensitive data) logger.debug(f"Request URL: {url}") logger.debug(f"Payload size: {len(features_json)} characters") response = requests.post(url, headers=headers, data=payload, timeout=180) # Log the response time and status code logger.debug(f"POST request to {url} completed with status code {response.status_code}. " f"Response time: {response.elapsed.total_seconds()} seconds.") response.raise_for_status() # Raise an error for bad status codes logger.info("Features added successfully.") # Log any successful response details if response.status_code == 200: logger.debug(f"Response JSON size: {len(response.text)} characters.") return response.json() except requests.exceptions.Timeout as e: logger.error(f"Request timed out while adding features: {e}") return {'error': 'Request timed out'} except requests.exceptions.RequestException as e: logger.error(f"Request error occurred while adding features: {e}") return {'error': str(e)} except json.JSONDecodeError as e: logger.error(f"Error decoding JSON response while adding features: {e}") return {'error': 'Invalid JSON response'} except Exception as e: logger.error(f"An unexpected error occurred while adding features: {e}") return {'error': str(e)} def main(): """Main entry point for the script.""" try: logger.info("Starting script execution.") # Parse command-line arguments results_per_page = parse_arguments() logger.info(f"Parsed arguments: results_per_page={results_per_page}") # Load environment variables logger.info("Loading environment variables.") load_dotenv("753DataSync.env") api_url = os.getenv('API_URL') if not api_url: logger.error("API_URL environment variable not found.") return # Generate the token username = os.getenv('AGOL_USER') password = os.getenv('AGOL_PASSWORD') if not username or not password: logger.error("Missing AGOL_USER or AGOL_PASSWORD in environment variables.") return token = generate_token(username, password) # Set ArcGIS host details hostname = os.getenv('HOSTNAME') instance = os.getenv('INSTANCE') fs = os.getenv('FS') layer = os.getenv('LAYER') # Truncate the layer before adding new features truncate(token, hostname, instance, fs, layer) all_data = [] page_number = 1 while True: try: # Fetch data from the API data = fetch_data(api_url, page_number, results_per_page) # Append features data to the aggregated list all_data.extend(data) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") page_filename = f"data/enforcement_page_{page_number}_results_{results_per_page}_{timestamp}.json" # Save individual page data if log_level == 'DEBUG': save_json(data, page_filename) # Check if the number of records is less than the results_per_page, indicating last page if len(data) < results_per_page: logger.info("No more data to fetch, stopping pagination.") break page_number += 1 except Exception as e: logger.error(f"Error fetching or saving data for page {page_number}: {e}", exc_info=True) break # Prepare aggregated data aggregated_data = all_data # Just use the collected features directly # Save aggregated data to a single JSON file aggregated_filename = f"data/aggregated_enforcement_results_{timestamp}.json" logger.info(f"Saving aggregated data to {aggregated_filename}.") save_json(aggregated_data, aggregated_filename) # Add the features to the feature layer response = add_features(token, hostname, instance, fs, layer, aggregated_data) except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) return finally: logger.info("Script execution completed.") if __name__ == "__main__": main()