Add app.py script for fetching and syncing enforcement data with ArcGIS.

This commit is contained in:
Nick Heppler 2025-03-26 12:09:09 -04:00
parent c5708d6d79
commit 6bbf29493c

231
app.py Normal file
View File

@ -0,0 +1,231 @@
import requests
import logging
import sys
import os
import json
from datetime import datetime
import argparse
import urllib.parse
from dotenv import load_dotenv
# Configuration
BASE_URL = "{}/{}/{}"
# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler('753DataSync.log')
file_handler.setLevel(logging.INFO)
# Stream handler (console output)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
# Log format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# Add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
def fetch_data(api_url, page_number, results_per_page):
"""Fetches data from the API and returns the response."""
url = BASE_URL.format(api_url, page_number, results_per_page)
try:
logger.info(f"Making request to: {url}")
response = requests.get(url)
# Check for HTTP errors
response.raise_for_status()
# Return JSON data
return response.json()
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred: {http_err}")
sys.exit(1)
except requests.exceptions.RequestException as req_err:
logger.error(f"Request error occurred: {req_err}")
sys.exit(1)
except Exception as err:
logger.error(f"An unexpected error occurred: {err}")
sys.exit(1)
def save_json(data, filename):
"""Saves JSON data to a file."""
try:
# Ensure directory exists
if not os.path.exists('data'):
os.makedirs('data')
# Save data to file
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
logger.info(f"Data saved to {filename}")
except Exception as e:
logger.error(f"Error saving JSON data: {e}")
sys.exit(1)
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Fetch enforcement data from the NYSDPS API.")
# Add arguments for results per page
parser.add_argument('--results_per_page', type=int, default=100, help="Number of results per page (default: 100)")
# Parse the arguments
args = parser.parse_args()
return args.results_per_page
def generate_token(username, password, url="https://www.arcgis.com/sharing/rest/generateToken"):
"""Generates an authentication token."""
payload = {
'f': 'json',
'username': username,
'password': password,
'client': 'referer',
'referer': 'https://www.arcgis.com',
'expiration': '120'
}
headers = {}
try:
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status() # Raise an error for bad status codes
token = response.json()['token']
logger.info("Token generated successfully.")
return token
except requests.exceptions.RequestException as e:
logger.error(f"Error generating token: {e}")
sys.exit(1)
def truncate(token, hostname, instance, fs, layer, secure=True):
"""Truncate the specified layer in the feature service."""
protocol = 'https://' if secure else 'http://'
url = f"{protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}/truncate?token={token}&async=true&f=json"
try:
# Attempt the POST request
logging.info(f"Attempting to truncate layer {layer} on {hostname}...")
response = requests.post(url, timeout=30)
# Check for HTTP errors
response.raise_for_status() # Raise an exception for HTTP errors (4xx, 5xx)
# Check for any known error in the response content (e.g., ArcGIS error codes)
if response.status_code == 200:
result = response.json()
if 'error' in result:
logging.error(f"Error truncating layer: {result['error']}")
return None
logging.info(f"Successfully truncated layer: {protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}.")
return result
else:
logging.error(f"Unexpected response: {response.status_code} - {response.text}")
return None
except requests.exceptions.RequestException as e:
# Catch network-related errors, timeouts, etc.
logging.error(f"Request failed: {e}")
return None
except Exception as e:
# Catch any other unexpected errors
logging.error(f"An unexpected error occurred: {e}")
return None
def add_features(token, hostname, instance, fs, layer, aggregated_data, secure=True):
"""Add features to a feature service."""
protocol = 'https://' if secure else 'http://'
url = f"{protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}/addFeatures?token={token}&rollbackOnFailure=true&f=json"
logger.info(f"Attempting to add features on {protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}...")
# Prepare features data as the payload
features_json = json.dumps(aggregated_data) # Convert aggregated data to JSON string
features_encoded = urllib.parse.quote(features_json) # URL-encode the JSON string
# Construct the URL-encoded payload with 'features=' and the URL-encoded data
payload = f"features={features_encoded}"
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
response = requests.post(url, headers=headers, data=payload, timeout=180)
response.raise_for_status() # Raise an error for bad status codes
logger.info("Features added successfully.")
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {e}")
return {'error': str(e)}
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response: {e}")
return {'error': 'Invalid JSON response'}
def main():
"""Main entry point for the script."""
# Parse command-line arguments
results_per_page = parse_arguments()
load_dotenv("753DataSync.env")
api_url = os.getenv('API_URL')
# Generate the token
username = os.getenv('AGOL_USER')
password = os.getenv('AGOL_PASSWORD')
token = generate_token(username, password)
# Set ArcGIS host details
hostname = os.getenv('HOSTNAME')
instance = os.getenv('INSTANCE')
fs = os.getenv('FS')
layer = os.getenv('LAYER')
# Truncate the layer before adding new features
truncate(token, hostname, instance, fs, layer)
all_data = []
page_number = 1
while True:
# Fetch data from the API
data = fetch_data(api_url, page_number, results_per_page)
# Append features data to the aggregated list
all_data.extend(data) # Data is now a list of features
# Generate filename with timestamp for the individual page
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
page_filename = f"data/enforcement_page_{page_number}_results_{results_per_page}_{timestamp}.json"
# Save individual page data
save_json(data, page_filename)
# Check if the number of records is less than the results_per_page, indicating last page
if len(data) < results_per_page:
logger.info("No more data to fetch, stopping pagination.")
break
page_number += 1
# Prepare aggregated data
aggregated_data = all_data # Just use the collected features directly
# Save aggregated data to a single JSON file
aggregated_filename = f"data/aggregated_enforcement_results_{timestamp}.json"
save_json(aggregated_data, aggregated_filename)
# Add the features to the feature layer
response = add_features(token, hostname, instance, fs, layer, aggregated_data)
logger.info(f"Add features response: {json.dumps(response, indent=2)}")
if __name__ == "__main__":
main()