753-Data-Sync/app.py

345 lines
13 KiB
Python

import requests
import logging
import sys
import os
import json
from datetime import datetime
from datetime import timedelta
import argparse
import urllib.parse
from dotenv import load_dotenv
import time
# Load environment variables from .env file
load_dotenv("753DataSync.env")
# Configuration
BASE_URL = "{}/{}/{}"
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
# Setup logging
logger = logging.getLogger()
# Set the log level for the logger
if log_level == 'DEBUG':
logger.setLevel(logging.DEBUG)
elif log_level == 'INFO':
logger.setLevel(logging.INFO)
elif log_level == 'WARNING':
logger.setLevel(logging.WARNING)
elif log_level == 'ERROR':
logger.setLevel(logging.ERROR)
elif log_level == 'CRITICAL':
logger.setLevel(logging.CRITICAL)
else:
logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler('753DataSync.log')
file_handler.setLevel(getattr(logging, log_level))
# Stream handler (console output)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(getattr(logging, log_level))
# Log format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
def fetch_data(api_url, page_number, results_per_page):
"""Fetches data from the API and returns the response."""
url = BASE_URL.format(api_url, page_number, results_per_page)
try:
logger.info(f"Making request to: {url} with page_number={page_number} and results_per_page={results_per_page}")
response = requests.get(url)
# Check for HTTP errors
response.raise_for_status()
# Success log
logger.info(f"Successfully fetched data from {url}. Status code: {response.status_code}.")
# Debug log with additional response details
logger.debug(f"GET request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
# Return JSON data
return response.json()
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred while fetching data from {url}: {http_err}")
sys.exit(1)
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error occurred while fetching data from {url}: {req_err}")
sys.exit(1)
except Exception as err:
logger.exception(f"An unexpected error occurred while fetching data from {url}: {err}")
sys.exit(1)
def save_json(data, filename):
"""Saves JSON data to a file."""
try:
# Ensure directory exists
if not os.path.exists('data'):
os.makedirs('data')
logger.info(f"Directory 'data' created.")
# Save data to file
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
logger.info(f"Data successfully saved to {filename}")
except OSError as e:
logger.error(f"OS error occurred while saving JSON data to {filename}: {e}")
sys.exit(1)
except IOError as e:
logger.error(f"I/O error occurred while saving JSON data to {filename}: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error occurred while saving JSON data to {filename}: {e}")
sys.exit(1)
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Fetch enforcement data from the NYSDPS API.")
# Add arguments for results per page
parser.add_argument('--results_per_page', type=int, default=100, help="Number of results per page (default: 100)")
# Parse the arguments
args = parser.parse_args()
return args.results_per_page
def generate_token(username, password, url="https://www.arcgis.com/sharing/rest/generateToken"):
"""Generates an authentication token."""
payload = {
'f': 'json',
'username': username,
'password': password,
'client': 'referer',
'referer': 'https://www.arcgis.com',
'expiration': '120'
}
headers = {}
try:
logger.info(f"Generating token for username '{username}' using URL: {url}")
response = requests.post(url, headers=headers, data=payload)
# Log the request status and response time
logger.debug(f"POST request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
response.raise_for_status() # Raise an error for bad status codes
# Extract token from the response
token = response.json().get('token')
if token:
logger.info("Token generated successfully.")
else:
logger.error("Token not found in the response.")
sys.exit(1)
return token
except requests.exceptions.RequestException as e:
logger.error(f"Error generating token for username '{username}': {e}")
sys.exit(1)
except KeyError as e:
logger.error(f"Error extracting token from the response: Missing key {e}")
sys.exit(1)
except Exception as e:
logger.exception(f"Unexpected error generating token for username '{username}': {e}")
sys.exit(1)
def truncate(token, hostname, instance, fs, layer, secure=True):
"""Truncate the specified layer in the feature service."""
protocol = 'https://' if secure else 'http://'
url = f"{protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}/truncate?token={token}&async=true&f=json"
try:
logging.info(f"Attempting to truncate layer {layer} on {hostname}...")
# Debug logging for the URL being used
logging.debug(f"Truncate URL: {url}")
response = requests.post(url, timeout=30)
# Log response time
logging.debug(f"POST request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
# Check for HTTP errors
response.raise_for_status() # Raise an exception for HTTP errors (4xx, 5xx)
# Check for any known error in the response content (e.g., ArcGIS error codes)
if response.status_code == 200:
result = response.json()
if 'error' in result:
logging.error(f"Error truncating layer {layer}: {result['error']}")
return None
logging.info(f"Successfully truncated layer: {protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}.")
return result
else:
logging.error(f"Unexpected response for layer {layer}: {response.status_code} - {response.text}")
return None
except requests.exceptions.Timeout as e:
logging.error(f"Request timed out while truncating layer {layer}: {e}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed while truncating layer {layer}: {e}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred while truncating layer {layer}: {e}")
return None
def add_features(token, hostname, instance, fs, layer, aggregated_data, secure=True):
"""Add features to a feature service."""
protocol = 'https://' if secure else 'http://'
url = f"{protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}/addFeatures?token={token}&rollbackOnFailure=true&f=json"
logger.info(f"Attempting to add features to {protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}...")
# Prepare features data as the payload
features_json = json.dumps(aggregated_data) # Convert aggregated data to JSON string
features_encoded = urllib.parse.quote(features_json) # URL-encode the JSON string
# Construct the URL-encoded payload with 'features=' and the URL-encoded data
payload = f"features={features_encoded}"
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
# Log request details (but avoid logging sensitive data)
logger.debug(f"Request URL: {url}")
logger.debug(f"Payload size: {len(features_json)} characters")
response = requests.post(url, headers=headers, data=payload, timeout=180)
# Log the response time and status code
logger.debug(f"POST request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
response.raise_for_status() # Raise an error for bad status codes
logger.info("Features added successfully.")
# Log any successful response details
if response.status_code == 200:
logger.debug(f"Response JSON size: {len(response.text)} characters.")
return response.json()
except requests.exceptions.Timeout as e:
logger.error(f"Request timed out while adding features: {e}")
return {'error': 'Request timed out'}
except requests.exceptions.RequestException as e:
logger.error(f"Request error occurred while adding features: {e}")
return {'error': str(e)}
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response while adding features: {e}")
return {'error': 'Invalid JSON response'}
except Exception as e:
logger.error(f"An unexpected error occurred while adding features: {e}")
return {'error': str(e)}
def main():
"""Main entry point for the script."""
start_time = time.time()
try:
logger.info("Starting script execution.")
# Parse command-line arguments
results_per_page = parse_arguments()
logger.info(f"Parsed arguments: results_per_page={results_per_page}")
# Load environment variables
logger.info("Loading environment variables.")
load_dotenv("753DataSync.env")
api_url = os.getenv('API_URL')
if not api_url:
logger.error("API_URL environment variable not found.")
return
# Generate the token
username = os.getenv('AGOL_USER')
password = os.getenv('AGOL_PASSWORD')
if not username or not password:
logger.error("Missing AGOL_USER or AGOL_PASSWORD in environment variables.")
return
token = generate_token(username, password)
# Set ArcGIS host details
hostname = os.getenv('HOSTNAME')
instance = os.getenv('INSTANCE')
fs = os.getenv('FS')
layer = os.getenv('LAYER')
# Truncate the layer before adding new features
truncate(token, hostname, instance, fs, layer)
all_data = []
page_number = 1
while True:
try:
# Fetch data from the API
data = fetch_data(api_url, page_number, results_per_page)
# Append features data to the aggregated list
all_data.extend(data)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
page_filename = f"data/enforcement_page_{page_number}_results_{results_per_page}_{timestamp}.json"
# Save individual page data if in DEBUG mode
if log_level == 'DEBUG':
save_json(data, page_filename)
# Stop if last page
if len(data) < results_per_page:
logger.info("No more data to fetch, stopping pagination.")
break
page_number += 1
except Exception as e:
logger.error(f"Error fetching or saving data for page {page_number}: {e}", exc_info=True)
break
# Prepare aggregated data
aggregated_data = all_data
# Save aggregated data
aggregated_filename = f"data/aggregated_enforcement_results_{timestamp}.json"
logger.info(f"Saving aggregated data to {aggregated_filename}.")
save_json(aggregated_data, aggregated_filename)
# Add the features to the feature layer
response = add_features(token, hostname, instance, fs, layer, aggregated_data)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
return
finally:
elapsed_time = timedelta(seconds=time.time() - start_time)
logger.info(f"Script execution completed in {str(elapsed_time)}.")
if __name__ == "__main__":
main()