753-Data-Sync/app.py
Nick Heppler 3afe292b23 fix: Ensure feature layer is truncated at the start of script execution
- Moved the call to the truncate function to always execute at the beginning of the main function.
- Ensured that the feature layer is cleared before loading data from a JSON file or fetching from the API.
2025-05-22 12:12:01 -04:00

407 lines
15 KiB
Python

import requests
import logging
import sys
import os
import json
from datetime import datetime
from datetime import timedelta
import argparse
import urllib.parse
from dotenv import load_dotenv
import time
# Load environment variables from .env file
load_dotenv("753DataSync.env")
# Configuration
BASE_URL = "{}/{}/{}"
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
# Get the current date for dynamic log file naming
current_date = datetime.now().strftime("%Y-%m-%d")
log_filename = f"753DataSync_{current_date}.log"
# Setup logging
logger = logging.getLogger()
# Set the log level for the logger
if log_level == 'DEBUG':
logger.setLevel(logging.DEBUG)
elif log_level == 'INFO':
logger.setLevel(logging.INFO)
elif log_level == 'WARNING':
logger.setLevel(logging.WARNING)
elif log_level == 'ERROR':
logger.setLevel(logging.ERROR)
elif log_level == 'CRITICAL':
logger.setLevel(logging.CRITICAL)
else:
logger.setLevel(logging.INFO)
# File handler for dynamic log file
file_handler = logging.FileHandler(log_filename)
file_handler.setLevel(getattr(logging, log_level))
# Stream handler (console output)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(getattr(logging, log_level))
# Log format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
def purge_old_files(purge_days):
"""Purge log and data files older than PURGE_DAYS from the 'data' folder."""
data_folder = 'data'
log_folder = '.' # Log files are in the current directory
if not os.path.exists(data_folder):
logger.warning(f"The '{data_folder}' folder does not exist.")
return
purge_threshold = datetime.now() - timedelta(days=purge_days)
# Delete old log files
for filename in os.listdir(log_folder):
if filename.endswith(".log"):
file_path = os.path.join(log_folder, filename)
file_modified_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_modified_time < purge_threshold:
logger.info(f"Deleting old log file: {file_path}")
os.remove(file_path)
# Delete old data files
for filename in os.listdir(data_folder):
file_path = os.path.join(data_folder, filename)
if filename.endswith(".json"):
file_modified_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_modified_time < purge_threshold:
logger.info(f"Deleting old data file: {file_path}")
os.remove(file_path)
def fetch_data(api_url, page_number, results_per_page):
"""Fetches data from the API and returns the response."""
url = BASE_URL.format(api_url, page_number, results_per_page)
try:
logger.info(f"Making request to: {url} with page_number={page_number} and results_per_page={results_per_page}")
response = requests.get(url)
# Check for HTTP errors
response.raise_for_status()
# Success log
logger.info(f"Successfully fetched data from {url}. Status code: {response.status_code}.")
# Debug log with additional response details
logger.debug(f"GET request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
# Return JSON data
return response.json()
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred while fetching data from {url}: {http_err}")
sys.exit(1)
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error occurred while fetching data from {url}: {req_err}")
sys.exit(1)
except Exception as err:
logger.exception(f"An unexpected error occurred while fetching data from {url}: {err}")
sys.exit(1)
def save_json(data, filename):
"""Saves JSON data to a file."""
try:
# Ensure directory exists
if not os.path.exists('data'):
os.makedirs('data')
logger.info(f"Directory 'data' created.")
# Save data to file
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
logger.info(f"Data successfully saved to {filename}")
except OSError as e:
logger.error(f"OS error occurred while saving JSON data to {filename}: {e}")
sys.exit(1)
except IOError as e:
logger.error(f"I/O error occurred while saving JSON data to {filename}: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error occurred while saving JSON data to {filename}: {e}")
sys.exit(1)
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Fetch enforcement data from the NYSDPS API.")
# Add arguments for results per page
parser.add_argument('--results_per_page', type=int, default=100, help="Number of results per page (default: 100)")
# Add a test flag
parser.add_argument('--test', action='store_true', help="If set, only fetch the first page of results.")
# Add a reload flag
parser.add_argument('--reload', type=str, help="If set, load data from the specified file instead of fetching from the API.")
# Parse the arguments
args = parser.parse_args()
return args.results_per_page, args.test, args.reload
def generate_token(username, password, url="https://www.arcgis.com/sharing/rest/generateToken"):
"""Generates an authentication token."""
payload = {
'f': 'json',
'username': username,
'password': password,
'client': 'referer',
'referer': 'https://www.arcgis.com',
'expiration': '120'
}
headers = {}
try:
logger.info(f"Generating token for username '{username}' using URL: {url}")
response = requests.post(url, headers=headers, data=payload)
# Log the request status and response time
logger.debug(f"POST request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
response.raise_for_status() # Raise an error for bad status codes
# Extract token from the response
token = response.json().get('token')
if token:
logger.info("Token generated successfully.")
else:
logger.error("Token not found in the response.")
sys.exit(1)
return token
except requests.exceptions.RequestException as e:
logger.error(f"Error generating token for username '{username}': {e}")
sys.exit(1)
except KeyError as e:
logger.error(f"Error extracting token from the response: Missing key {e}")
sys.exit(1)
except Exception as e:
logger.exception(f"Unexpected error generating token for username '{username}': {e}")
sys.exit(1)
def truncate(token, hostname, instance, fs, layer, secure=True):
"""Truncate the specified layer in the feature service."""
protocol = 'https://' if secure else 'http://'
url = f"{protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}/truncate?token={token}&async=true&f=json"
try:
logging.info(f"Attempting to truncate layer {layer} on {hostname}...")
# Debug logging for the URL being used
logging.debug(f"Truncate URL: {url}")
response = requests.post(url, timeout=30)
# Log response time
logging.debug(f"POST request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
# Check for HTTP errors
response.raise_for_status() # Raise an exception for HTTP errors (4xx, 5xx)
# Check for any known error in the response content (e.g., ArcGIS error codes)
if response.status_code == 200:
result = response.json()
if 'error' in result:
logging.error(f"Error truncating layer {layer}: {result['error']}")
return None
logging.info(f"Successfully truncated layer: {protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}.")
return result
else:
logging.error(f"Unexpected response for layer {layer}: {response.status_code} - {response.text}")
return None
except requests.exceptions.Timeout as e:
logging.error(f"Request timed out while truncating layer {layer}: {e}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed while truncating layer {layer}: {e}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred while truncating layer {layer}: {e}")
return None
def add_features(token, hostname, instance, fs, layer, aggregated_data, secure=True):
"""Add features to a feature service."""
protocol = 'https://' if secure else 'http://'
url = f"{protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}/addFeatures?token={token}&rollbackOnFailure=true&f=json"
logger.info(f"Attempting to add features to {protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}...")
# Prepare features data as the payload
features_json = json.dumps(aggregated_data) # Convert aggregated data to JSON string
features_encoded = urllib.parse.quote(features_json) # URL-encode the JSON string
# Construct the URL-encoded payload with 'features=' and the URL-encoded data
payload = f"features={features_encoded}"
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
# Log request details (but avoid logging sensitive data)
logger.debug(f"Request URL: {url}")
logger.debug(f"Payload size: {len(features_json)} characters")
response = requests.post(url, headers=headers, data=payload, timeout=180)
# Log the response time and status code
logger.debug(f"POST request to {url} completed with status code {response.status_code}. "
f"Response time: {response.elapsed.total_seconds()} seconds.")
response.raise_for_status() # Raise an error for bad status codes
logger.info("Features added successfully.")
# Log any successful response details
if response.status_code == 200:
logger.debug(f"Response JSON size: {len(response.text)} characters.")
return response.json()
except requests.exceptions.Timeout as e:
logger.error(f"Request timed out while adding features: {e}")
return {'error': 'Request timed out'}
except requests.exceptions.RequestException as e:
logger.error(f"Request error occurred while adding features: {e}")
return {'error': str(e)}
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response while adding features: {e}")
return {'error': 'Invalid JSON response'}
except Exception as e:
logger.error(f"An unexpected error occurred while adding features: {e}")
return {'error': str(e)}
def main():
"""Main entry point for the script."""
start_time = time.time()
try:
logger.info("Starting script execution.")
# Check and purge old files before processing
purge_days = int(os.getenv("PURGE_DAYS", 30)) # Default to 30 days if not set
logger.info(f"Purging files older than {purge_days} days.")
purge_old_files(purge_days)
# Parse command-line arguments
results_per_page, test_mode, reload_file = parse_arguments()
logger.info(f"Parsed arguments: results_per_page={results_per_page}, test_mode={test_mode}, reload_file={reload_file}")
# Load environment variables
logger.info("Loading environment variables.")
load_dotenv("753DataSync.env")
api_url = os.getenv('API_URL')
if not api_url:
logger.error("API_URL environment variable not found.")
return
# Generate the token
username = os.getenv('AGOL_USER')
password = os.getenv('AGOL_PASSWORD')
if not username or not password:
logger.error("Missing AGOL_USER or AGOL_PASSWORD in environment variables.")
return
token = generate_token(username, password)
# Set ArcGIS host details
hostname = os.getenv('HOSTNAME')
instance = os.getenv('INSTANCE')
fs = os.getenv('FS')
layer = os.getenv('LAYER')
logger.info("Truncating the feature layer.")
truncate(token, hostname, instance, fs, layer)
# If --reload flag is set, load data from the specified file
if reload_file:
logger.info(f"Reloading data from file: {reload_file}")
# Load data from the specified file
with open(reload_file, 'r', encoding='utf-8') as f:
aggregated_data = json.load(f)
# Add the features to the feature layer
response = add_features(token, hostname, instance, fs, layer, aggregated_data)
logger.info("Data reloaded successfully from the specified file.")
return
all_data = []
page_number = 1
while True:
try:
# Fetch data from the API
data = fetch_data(api_url, page_number, results_per_page)
# Append features data to the aggregated list
all_data.extend(data)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
page_filename = f"data/enforcement_page_{page_number}_results_{results_per_page}_{timestamp}.json"
# Save individual page data if in DEBUG mode
if log_level == 'DEBUG':
save_json(data, page_filename)
# Stop if last page
if len(data) < results_per_page:
logger.info("No more data to fetch, stopping pagination.")
break
# Break the loop if in test mode
if test_mode:
logger.info("Test mode is enabled, stopping after the first page.")
break
page_number += 1
except Exception as e:
logger.error(f"Error fetching or saving data for page {page_number}: {e}", exc_info=True)
break
# Prepare aggregated data
aggregated_data = all_data
# Save aggregated data
aggregated_filename = f"data/aggregated_enforcement_results_{timestamp}.json"
logger.info(f"Saving aggregated data to {aggregated_filename}.")
save_json(aggregated_data, aggregated_filename)
# Add the features to the feature layer
response = add_features(token, hostname, instance, fs, layer, aggregated_data)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
return
finally:
elapsed_time = timedelta(seconds=time.time() - start_time)
logger.info(f"Script execution completed in {str(elapsed_time)}.")
if __name__ == "__main__":
main()