345 lines
13 KiB
Python
345 lines
13 KiB
Python
import requests
|
|
import logging
|
|
import sys
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
import argparse
|
|
import urllib.parse
|
|
from dotenv import load_dotenv
|
|
import time
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv("753DataSync.env")
|
|
|
|
# Configuration
|
|
BASE_URL = "{}/{}/{}"
|
|
log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
|
|
# Setup logging
|
|
logger = logging.getLogger()
|
|
|
|
# Set the log level for the logger
|
|
if log_level == 'DEBUG':
|
|
logger.setLevel(logging.DEBUG)
|
|
elif log_level == 'INFO':
|
|
logger.setLevel(logging.INFO)
|
|
elif log_level == 'WARNING':
|
|
logger.setLevel(logging.WARNING)
|
|
elif log_level == 'ERROR':
|
|
logger.setLevel(logging.ERROR)
|
|
elif log_level == 'CRITICAL':
|
|
logger.setLevel(logging.CRITICAL)
|
|
else:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# File handler
|
|
file_handler = logging.FileHandler('753DataSync.log')
|
|
file_handler.setLevel(getattr(logging, log_level))
|
|
|
|
# Stream handler (console output)
|
|
stream_handler = logging.StreamHandler(sys.stdout)
|
|
stream_handler.setLevel(getattr(logging, log_level))
|
|
|
|
# Log format
|
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(formatter)
|
|
stream_handler.setFormatter(formatter)
|
|
|
|
# Add handlers to the logger
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(stream_handler)
|
|
|
|
def fetch_data(api_url, page_number, results_per_page):
|
|
"""Fetches data from the API and returns the response."""
|
|
url = BASE_URL.format(api_url, page_number, results_per_page)
|
|
|
|
try:
|
|
logger.info(f"Making request to: {url} with page_number={page_number} and results_per_page={results_per_page}")
|
|
response = requests.get(url)
|
|
|
|
# Check for HTTP errors
|
|
response.raise_for_status()
|
|
|
|
# Success log
|
|
logger.info(f"Successfully fetched data from {url}. Status code: {response.status_code}.")
|
|
|
|
# Debug log with additional response details
|
|
logger.debug(f"GET request to {url} completed with status code {response.status_code}. "
|
|
f"Response time: {response.elapsed.total_seconds()} seconds.")
|
|
|
|
# Return JSON data
|
|
return response.json()
|
|
|
|
except requests.exceptions.HTTPError as http_err:
|
|
logger.error(f"HTTP error occurred while fetching data from {url}: {http_err}")
|
|
sys.exit(1)
|
|
except requests.exceptions.RequestException as req_err:
|
|
logger.error(f"Request error occurred while fetching data from {url}: {req_err}")
|
|
sys.exit(1)
|
|
except Exception as err:
|
|
logger.exception(f"An unexpected error occurred while fetching data from {url}: {err}")
|
|
sys.exit(1)
|
|
|
|
def save_json(data, filename):
|
|
"""Saves JSON data to a file."""
|
|
try:
|
|
# Ensure directory exists
|
|
if not os.path.exists('data'):
|
|
os.makedirs('data')
|
|
logger.info(f"Directory 'data' created.")
|
|
|
|
# Save data to file
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=4)
|
|
|
|
logger.info(f"Data successfully saved to {filename}")
|
|
|
|
except OSError as e:
|
|
logger.error(f"OS error occurred while saving JSON data to {filename}: {e}")
|
|
sys.exit(1)
|
|
except IOError as e:
|
|
logger.error(f"I/O error occurred while saving JSON data to {filename}: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error occurred while saving JSON data to {filename}: {e}")
|
|
sys.exit(1)
|
|
|
|
def parse_arguments():
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(description="Fetch enforcement data from the NYSDPS API.")
|
|
|
|
# Add arguments for results per page
|
|
parser.add_argument('--results_per_page', type=int, default=100, help="Number of results per page (default: 100)")
|
|
|
|
# Parse the arguments
|
|
args = parser.parse_args()
|
|
|
|
return args.results_per_page
|
|
|
|
def generate_token(username, password, url="https://www.arcgis.com/sharing/rest/generateToken"):
|
|
"""Generates an authentication token."""
|
|
payload = {
|
|
'f': 'json',
|
|
'username': username,
|
|
'password': password,
|
|
'client': 'referer',
|
|
'referer': 'https://www.arcgis.com',
|
|
'expiration': '120'
|
|
}
|
|
headers = {}
|
|
|
|
try:
|
|
logger.info(f"Generating token for username '{username}' using URL: {url}")
|
|
response = requests.post(url, headers=headers, data=payload)
|
|
|
|
# Log the request status and response time
|
|
logger.debug(f"POST request to {url} completed with status code {response.status_code}. "
|
|
f"Response time: {response.elapsed.total_seconds()} seconds.")
|
|
|
|
response.raise_for_status() # Raise an error for bad status codes
|
|
|
|
# Extract token from the response
|
|
token = response.json().get('token')
|
|
|
|
if token:
|
|
logger.info("Token generated successfully.")
|
|
else:
|
|
logger.error("Token not found in the response.")
|
|
sys.exit(1)
|
|
|
|
return token
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error generating token for username '{username}': {e}")
|
|
sys.exit(1)
|
|
except KeyError as e:
|
|
logger.error(f"Error extracting token from the response: Missing key {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.exception(f"Unexpected error generating token for username '{username}': {e}")
|
|
sys.exit(1)
|
|
|
|
def truncate(token, hostname, instance, fs, layer, secure=True):
|
|
"""Truncate the specified layer in the feature service."""
|
|
|
|
protocol = 'https://' if secure else 'http://'
|
|
url = f"{protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}/truncate?token={token}&async=true&f=json"
|
|
|
|
try:
|
|
logging.info(f"Attempting to truncate layer {layer} on {hostname}...")
|
|
|
|
# Debug logging for the URL being used
|
|
logging.debug(f"Truncate URL: {url}")
|
|
|
|
response = requests.post(url, timeout=30)
|
|
|
|
# Log response time
|
|
logging.debug(f"POST request to {url} completed with status code {response.status_code}. "
|
|
f"Response time: {response.elapsed.total_seconds()} seconds.")
|
|
|
|
# Check for HTTP errors
|
|
response.raise_for_status() # Raise an exception for HTTP errors (4xx, 5xx)
|
|
|
|
# Check for any known error in the response content (e.g., ArcGIS error codes)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
if 'error' in result:
|
|
logging.error(f"Error truncating layer {layer}: {result['error']}")
|
|
return None
|
|
logging.info(f"Successfully truncated layer: {protocol}{hostname}/{instance}/arcgis/rest/admin/services/{fs}/FeatureServer/{layer}.")
|
|
return result
|
|
else:
|
|
logging.error(f"Unexpected response for layer {layer}: {response.status_code} - {response.text}")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout as e:
|
|
logging.error(f"Request timed out while truncating layer {layer}: {e}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Request failed while truncating layer {layer}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"An unexpected error occurred while truncating layer {layer}: {e}")
|
|
return None
|
|
|
|
def add_features(token, hostname, instance, fs, layer, aggregated_data, secure=True):
|
|
"""Add features to a feature service."""
|
|
protocol = 'https://' if secure else 'http://'
|
|
url = f"{protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}/addFeatures?token={token}&rollbackOnFailure=true&f=json"
|
|
|
|
logger.info(f"Attempting to add features to {protocol}{hostname}/{instance}/arcgis/rest/services/{fs}/FeatureServer/{layer}...")
|
|
|
|
# Prepare features data as the payload
|
|
features_json = json.dumps(aggregated_data) # Convert aggregated data to JSON string
|
|
features_encoded = urllib.parse.quote(features_json) # URL-encode the JSON string
|
|
|
|
# Construct the URL-encoded payload with 'features=' and the URL-encoded data
|
|
payload = f"features={features_encoded}"
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded'
|
|
}
|
|
|
|
try:
|
|
# Log request details (but avoid logging sensitive data)
|
|
logger.debug(f"Request URL: {url}")
|
|
logger.debug(f"Payload size: {len(features_json)} characters")
|
|
|
|
response = requests.post(url, headers=headers, data=payload, timeout=180)
|
|
|
|
# Log the response time and status code
|
|
logger.debug(f"POST request to {url} completed with status code {response.status_code}. "
|
|
f"Response time: {response.elapsed.total_seconds()} seconds.")
|
|
|
|
response.raise_for_status() # Raise an error for bad status codes
|
|
|
|
logger.info("Features added successfully.")
|
|
|
|
# Log any successful response details
|
|
if response.status_code == 200:
|
|
logger.debug(f"Response JSON size: {len(response.text)} characters.")
|
|
|
|
return response.json()
|
|
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Request timed out while adding features: {e}")
|
|
return {'error': 'Request timed out'}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error occurred while adding features: {e}")
|
|
return {'error': str(e)}
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error decoding JSON response while adding features: {e}")
|
|
return {'error': 'Invalid JSON response'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred while adding features: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def main():
|
|
"""Main entry point for the script."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
logger.info("Starting script execution.")
|
|
|
|
# Parse command-line arguments
|
|
results_per_page = parse_arguments()
|
|
logger.info(f"Parsed arguments: results_per_page={results_per_page}")
|
|
|
|
# Load environment variables
|
|
logger.info("Loading environment variables.")
|
|
load_dotenv("753DataSync.env")
|
|
api_url = os.getenv('API_URL')
|
|
if not api_url:
|
|
logger.error("API_URL environment variable not found.")
|
|
return
|
|
|
|
# Generate the token
|
|
username = os.getenv('AGOL_USER')
|
|
password = os.getenv('AGOL_PASSWORD')
|
|
if not username or not password:
|
|
logger.error("Missing AGOL_USER or AGOL_PASSWORD in environment variables.")
|
|
return
|
|
token = generate_token(username, password)
|
|
|
|
# Set ArcGIS host details
|
|
hostname = os.getenv('HOSTNAME')
|
|
instance = os.getenv('INSTANCE')
|
|
fs = os.getenv('FS')
|
|
layer = os.getenv('LAYER')
|
|
|
|
# Truncate the layer before adding new features
|
|
truncate(token, hostname, instance, fs, layer)
|
|
|
|
all_data = []
|
|
page_number = 1
|
|
|
|
while True:
|
|
try:
|
|
# Fetch data from the API
|
|
data = fetch_data(api_url, page_number, results_per_page)
|
|
|
|
# Append features data to the aggregated list
|
|
all_data.extend(data)
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
page_filename = f"data/enforcement_page_{page_number}_results_{results_per_page}_{timestamp}.json"
|
|
|
|
# Save individual page data if in DEBUG mode
|
|
if log_level == 'DEBUG':
|
|
save_json(data, page_filename)
|
|
|
|
# Stop if last page
|
|
if len(data) < results_per_page:
|
|
logger.info("No more data to fetch, stopping pagination.")
|
|
break
|
|
|
|
page_number += 1
|
|
except Exception as e:
|
|
logger.error(f"Error fetching or saving data for page {page_number}: {e}", exc_info=True)
|
|
break
|
|
|
|
# Prepare aggregated data
|
|
aggregated_data = all_data
|
|
|
|
# Save aggregated data
|
|
aggregated_filename = f"data/aggregated_enforcement_results_{timestamp}.json"
|
|
logger.info(f"Saving aggregated data to {aggregated_filename}.")
|
|
save_json(aggregated_data, aggregated_filename)
|
|
|
|
# Add the features to the feature layer
|
|
response = add_features(token, hostname, instance, fs, layer, aggregated_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
|
|
return
|
|
finally:
|
|
elapsed_time = timedelta(seconds=time.time() - start_time)
|
|
logger.info(f"Script execution completed in {str(elapsed_time)}.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |