Module palletjack.utils
Utility classes and methods that are used internally throughout palletjack. Many are exposed publicly in case they are useful elsewhere in a client's code.
Expand source code
"""Utility classes and methods that are used internally throughout palletjack. Many are exposed publicly in case they are useful elsewhere in a client's code.
"""
import datetime
import importlib
import logging
import random
import re
import sys
import warnings
from math import floor
from pathlib import Path
from time import sleep
import arcgis
import geopandas as gpd
import pandas as pd
import pygsheets
import pyogrio
import requests
from palletjack.errors import IntFieldAsFloatError, TimezoneAwareDatetimeError
module_logger = logging.getLogger(__name__)
RETRY_MAX_TRIES = 3
RETRY_DELAY_TIME = 2
def retry(worker_method, *args, **kwargs):
"""Allows you to retry a function/method to overcome network jitters or other transient errors.
Retries worker_method RETRY_MAX_TRIES times (for a total of n+1 tries, including the initial attempt), pausing
2^RETRY_DELAY_TIME seconds between each retry. Any arguments for worker_method can be passed in as additional
parameters to retry() following worker_method: retry(foo_method, arg1, arg2, keyword_arg=3).
RETRY_MAX_TRIES and RETRY_DELAY_TIME default to 3 tries and 2 seconds, but can be overridden by setting the
palletjack.utils.RETRY_MAX_TRIES and palletjack.utils.RETRY_DELAY_TIME constants in the client script.
Args:
worker_method (callable): The name of the method to be retried (minus the calling parens)
Raises:
error: The final error that causes worker_method to fail after 3 retries
Returns:
various: The value(s) returned by worked_method
"""
tries = 1
max_tries = RETRY_MAX_TRIES
delay = RETRY_DELAY_TIME #: in seconds
#: this inner function (closure? almost-closure?) allows us to keep track of tries without passing it as an arg
def _inner_retry(worker_method, *args, **kwargs):
nonlocal tries
try:
return worker_method(*args, **kwargs)
#: ArcGIS API for Python loves throwing bog-standard Exceptions, so we can't narrow this down further
except Exception as error:
if tries <= max_tries: #pylint: disable=no-else-return
wait_time = delay**tries
module_logger.debug(
'Exception "%s" thrown on "%s". Retrying after %s seconds...', error, worker_method, wait_time
)
sleep(wait_time)
tries += 1
return _inner_retry(worker_method, *args, **kwargs)
else:
raise error
return _inner_retry(worker_method, *args, **kwargs)
def rename_columns_for_agol(columns):
"""Replace special characters and spaces with '_' to match AGOL field names
Args:
columns (iter): The new columns to be renamed
Returns:
Dict: Mapping {'original name': 'cleaned_name'}
"""
rename_dict = {}
for column in columns:
no_specials = re.sub(r'[^a-zA-Z0-9_]', '_', column)
match = re.match(r'(^[0-9_]+)', no_specials)
if match:
number = match.groups()[0]
rename_dict[column] = no_specials.removeprefix(number) + number
continue
rename_dict[column] = no_specials
return rename_dict
#: Unused?
def check_fields_match(featurelayer, new_dataframe):
"""Make sure new data doesn't have any extra fields, warn if it doesn't contain all live fields
Args:
featurelayer (arcgis.features.FeatureLayer): Live data
new_dataframe (pd.DataFrame): New data
Raises:
RuntimeError: If new data contains a field not present in the live data
"""
live_fields = {field['name'] for field in featurelayer.properties.fields}
new_fields = set(new_dataframe.columns)
#: Remove SHAPE field from set (live "featurelayer.properties['fields']" does not expose the 'SHAPE' field)
try:
new_fields.remove('SHAPE')
except KeyError:
pass
new_dif = new_fields - live_fields
live_dif = live_fields - new_fields
if new_dif:
raise RuntimeError(
f'New dataset contains the following fields that are not present in the live dataset: {new_dif}'
)
if live_dif:
module_logger.warning(
'New dataset does not contain the following fields that are present in the live dataset: %s', live_dif
)
#: Unused?
def check_index_column_in_feature_layer(featurelayer, index_column):
"""Ensure index_column is present for any future operations
Args:
featurelayer (arcgis.features.FeatureLayer): The live feature layer
index_column (str): The index column meant to link new and live data
Raises:
RuntimeError: If index_column is not in featurelayer's fields
"""
featurelayer_fields = [field['name'] for field in featurelayer.properties.fields]
if index_column not in featurelayer_fields:
raise RuntimeError(f'Index column {index_column} not found in feature layer fields {featurelayer_fields}')
#: unused?
def rename_fields(dataframe, field_mapping):
"""Rename fields based on field_mapping
Args:
dataframe (pd.DataFrame): Dataframe with columns to be renamed
field_mapping (dict): Mapping of existing field names to new names
Raises:
ValueError: If an existing name from field_mapping is not found in dataframe.columns
Returns:
pd.DataFrame: Dataframe with renamed fields
"""
for original_name in field_mapping.keys():
if original_name not in dataframe.columns:
raise ValueError(f'Field {original_name} not found in dataframe.')
renamed_df = dataframe.rename(columns=field_mapping)
return renamed_df
#: This isn't used anymore... but it feels like a shame to lose it.
def build_sql_in_list(series):
"""Generate a properly formatted list to be a target for a SQL 'IN' clause
Args:
series (pd.Series): Series of values to be included in the 'IN' list
Returns:
str: Values formatted as (1, 2, 3) for numbers or ('a', 'b', 'c') for anything else
"""
if pd.api.types.is_numeric_dtype(series):
return f'({", ".join(series.astype(str))})'
else:
quoted_values = [f"'{value}'" for value in series]
return f'({", ".join(quoted_values)})'
#: Unused in v3, but keeping for "unique constraint" info.
def check_field_set_to_unique(featurelayer, field_name):
"""Makes sure field_name has a "unique constraint" in AGOL, which allows it to be used for .append upserts
Args:
featurelayer (arcgis.features.FeatureLayer): The target feature layer
field_name (str): The AGOL-valid field name to check
Raises:
RuntimeError: If the field is not unique (or if it's indexed but not unique)
"""
fields = [field['fields'] for field in featurelayer.properties.indexes]
if field_name not in fields:
raise RuntimeError(f'{field_name} does not have a "unique constraint" set within the feature layer')
for field in featurelayer.properties.indexes:
if field['fields'] == field_name:
if not field['isUnique']:
raise RuntimeError(f'{field_name} does not have a "unique constraint" set within the feature layer')
class Geocoding:
"""Methods for geocoding an address
"""
@staticmethod
def geocode_addr(street, zone, api_key, rate_limits, **api_args):
"""Geocode an address through the UGRC Web API geocoder
Invalid results are returned with an x,y of 0,0, a score of 0.0, and a match address of 'No Match'
Args:
street (str): The street address
zone (str): The zip code or city
api_key (str): API key obtained from developer.mapserv.utah.gov
rate_limits (Tuple <float>): A lower and upper bound in seconds for pausing between API calls. Defaults to
(0.015, 0.03)
**api_args (dict): Keyword arguments to be passed as parameters in the API GET call. The API key will be
added to this dict.
Returns:
tuple[int]: The match's x coordinate, y coordinate, score, and match address
"""
sleep(random.uniform(rate_limits[0], rate_limits[1]))
url = f'https://api.mapserv.utah.gov/api/v1/geocode/{street}/{zone}'
api_args['apiKey'] = api_key
try:
geocode_result_dict = retry(Geocoding._geocode_api_call, url, api_args)
except Exception as error:
module_logger.error(error)
return (0, 0, 0., 'No API response')
return (
geocode_result_dict['location']['x'],
geocode_result_dict['location']['y'],
geocode_result_dict['score'],
geocode_result_dict['matchAddress'],
)
@staticmethod
def _geocode_api_call(url, api_args):
"""Makes a requests.get call to the geocoding API.
Meant to be called through a retry wrapper so that the RuntimeErrors get tried again a couple times before
finally raising the error.
Args:
url (str): Base url for GET request
api_args (dict): Dictionary of URL parameters
Raises:
RuntimeError: If the server does not return response and request.get returns a falsy object.
RuntimeError: If the server returns a status code other than 200 or 404
Returns:
dict: The 'results' dictionary of the response json (location, score, and matchAddress)
"""
response = requests.get(url, params=api_args)
#: The server times out and doesn't respond
if response is None:
module_logger.debug('GET call did not return a response')
raise RuntimeError('No response from GET; request timeout?')
#: The point does geocode
if response.status_code == 200:
return response.json()['result']
#: The point doesn't geocode
if response.status_code == 404:
return {
'location': {
'x': 0,
'y': 0
},
'score': 0.,
'matchAddress': 'No Match',
}
#: If we haven't returned, raise an error to trigger _retry
raise RuntimeError(f'Did not receive a valid geocoding response; status code: {response.status_code}')
@staticmethod
def validate_api_key(api_key):
"""Check to see if a Web API key is valid by geocoding a single, known address point
Args:
api_key (str): API Key
Raises:
RuntimeError: If there was a network or other error attempting to geocode the known point
ValueError: If the API responds with an invalid key message
UserWarning: If the API responds with some other abnormal result
"""
url = 'https://api.mapserv.utah.gov/api/v1/geocode/326 east south temple street/slc'
try:
response = retry(requests.get, url=url, params={'apiKey': api_key})
except Exception as error:
raise RuntimeError(
'Could not determine key validity; check your API key and/or network connection'
) from error
response_json = response.json()
if response_json['status'] == 200:
return
if response_json['status'] == 400 and 'Invalid API key' in response_json['message']:
raise ValueError(f'API key validation failed: {response_json["message"]}')
warnings.warn(f'Unhandled API key validation response {response_json["status"]}: {response_json["message"]}')
def calc_modulus_for_reporting_interval(n, split_value=500):
"""Calculate a number that can be used as a modulus for splitting n up into 10 or 20 intervals, depending on
split_value.
Args:
n (int): The number to divide into intervals
split_value (int, optional): The point at which it should create 20 intervals instead of 10. Defaults to 500.
Returns:
int: Number to be used as modulus to compare to 0 in reporting code
"""
if n <= 10:
return 1
if n < split_value:
return floor(n / 10)
return floor(n / 20)
def authorize_pygsheets(credentials):
"""Authenticate pygsheets using either a service file or google.auth.credentials.Credentials object.
Requires either the path to a service account .json file that has access to the files in question or a `google.
auth.credentials.Credentials` object. Calling `google.auth.default()` in a Google Cloud Function will give you a
tuple of a `Credentials` object and the project id. You can use this `Credentials` object to authorize pygsheets as
the same account the Cloud Function is running under.
Tries first to load credentials from file; if this fails tries credentials directly as a custom_credential.
Args:
credentials (str or google.auth.credentials.Credentials): Path to the service file OR credentials object
obtained from google.auth.default() within a cloud function.
Raises:
RuntimeError: If both authorization method attempts fail
Returns:
pygsheets.Client: Authorized pygsheets client
"""
try:
return pygsheets.authorize(service_file=credentials)
except (FileNotFoundError, TypeError) as err:
module_logger.debug(err)
module_logger.debug('Credentials file not found, trying as environment variable')
try:
return pygsheets.authorize(custom_credentials=credentials)
except Exception as err:
raise RuntimeError('Could not authenticate to Google API') from err
def sedf_to_gdf(dataframe):
"""Convert an Esri Spatially Enabled DataFrame to a GeoPandas GeoDataFrame
Args:
dataframe (pd.DataFrame.spatial): Esri spatially enabled dataframe to convert
Returns:
GeoPandas.DataFrame: dataframe converted to GeoDataFrame
"""
gdf = gpd.GeoDataFrame(dataframe, geometry=dataframe.spatial.name)
try:
gdf.set_crs(dataframe.spatial.sr['latestWkid'], inplace=True)
except KeyError:
gdf.set_crs(dataframe.spatial.sr['wkid'], inplace=True)
return gdf
def save_feature_layer_to_gdb(feature_layer, directory):
"""Save a feature_layer to a gdb for safety as backup.gdb/{layer name}_{todays date}
Args:
feature_layer (arcgis.features.FeatureLayer): The FeatureLayer object to save to disk.
directory (str or Path): The directory to save the data to.
Returns:
Path: The full path to the output file, named with the layer name and today's date.
"""
module_logger.debug('Downloading existing data...')
dataframe = feature_layer.query().sdf
if dataframe.empty:
return f'No data to save in feature layer {feature_layer.properties.name}'
gdf = sedf_to_gdf(dataframe)
out_path = Path(directory, 'backup.gdb')
out_layer = f'{feature_layer.properties.name}_{datetime.date.today().strftime("%Y_%m_%d")}'
module_logger.debug('Saving existing data to %s', out_path)
try:
gdf.to_file(out_path, layer=out_layer, engine='pyogrio', driver='OpenFileGDB')
except pyogrio.errors.DataSourceError as error:
raise ValueError(
f'Error writing {out_layer} to {out_path}. Verify {Path(directory)} exists and is writable.'
) from error
return out_path
class FieldChecker:
"""Check the fields of a new dataframe against live data. Each method will raise errors if its checks fail.
Provides the check_fields class method to run all the checks in one call with having to create an object.
"""
@classmethod
def check_fields(cls, live_data_properties, new_dataframe, fields, add_oid):
"""Run all the field checks, raising errors and warnings where they fail.
Check individual method docstrings for details and specific errors raised.
Args:
live_data_properties (dict): FeatureLayer.properties of live data
new_dataframe (pd.DataFrame): New data to be checked
fields (List[str]): Fields to check
add_oid (bool): Add OBJECTID to fields if its not already present (for operations that are dependent on
OBJECTID, such as upsert)
"""
field_checker = cls(live_data_properties, new_dataframe)
field_checker.check_fields_present(fields, add_oid=add_oid)
field_checker.check_live_and_new_field_types_match(fields)
field_checker.check_for_non_null_fields(fields)
field_checker.check_field_length(fields)
# field_checker.check_srs_wgs84()
field_checker.check_nullable_ints_shapely()
def __init__(self, live_data_properties, new_dataframe):
"""
Args:
live_data_properties (dict): FeatureLayer.properties of live data
new_dataframe (pd.DataFrame): New data to be checked
"""
self.live_data_properties = live_data_properties
self.fields_dataframe = pd.DataFrame(live_data_properties.fields)
self.new_dataframe = new_dataframe
def check_live_and_new_field_types_match(self, fields):
"""Raise an error if the field types of the live and new data don't match.
Uses a dictionary mapping Esri field types to pandas dtypes. If 'SHAPE' is included in the fields, it calls
_check_geometry_types to verify the spatial types are compatible.
Args:
fields (List[str]): Fields to be updated
Raises:
ValueError: If the field types or spatial types are incompatible, the new data has multiple geometry types,
or the new data is not a valid spatially-enabled dataframe.
NotImplementedError: If the live data has a field that has not yet been mapped to a pandas dtype.
"""
#: Converting dtypes to str and comparing seems to be the only way to break out into shorts and longs, singles
#: and doubles. Otherwise, checking subclass is probably more pythonic.
short_ints = ['uint8', 'uint16', 'int8', 'int16']
long_ints = ['int', 'uint32', 'uint64', 'int32', 'int64']
#: Leaving the commented types here for future implementation if necessary
esri_to_pandas_types_mapping = {
'esriFieldTypeInteger': ['int'] + short_ints + long_ints,
'esriFieldTypeSmallInteger': short_ints,
'esriFieldTypeDouble': ['float', 'float32', 'float64'],
'esriFieldTypeSingle': ['float32'],
'esriFieldTypeString': ['str', 'object', 'string'],
'esriFieldTypeDate': ['datetime64[ns]'],
'esriFieldTypeGeometry': ['geometry'],
'esriFieldTypeOID': ['int'] + short_ints + long_ints,
# 'esriFieldTypeBlob': [],
'esriFieldTypeGlobalID': ['str', 'object', 'string'],
# 'esriFieldTypeRaster': [],
'esriFieldTypeGUID': ['str', 'object', 'string'],
# 'esriFieldTypeXML': [],
}
#: geometry checking gets its own function
if 'SHAPE' in fields:
self._check_geometry_types()
fields.remove('SHAPE')
fields_to_check = self.fields_dataframe[self.fields_dataframe['name'].isin(fields)].set_index('name')
invalid_fields = []
int_fields_as_floats = []
datetime_fields_with_timezone = []
for field in fields:
#: check against the str.lower to catch normal dtypes (int64) and the new, pd.NA-aware dtypes (Int64)
new_dtype = str(self.new_dataframe[field].dtype).lower()
live_type = fields_to_check.loc[field, 'type']
try:
if new_dtype not in esri_to_pandas_types_mapping[live_type]:
invalid_fields.append((field, live_type, str(self.new_dataframe[field].dtype)))
if new_dtype in ['float', 'float32', 'float64'
] and live_type in ['esriFieldTypeInteger', 'esriFieldTypeSmallInteger']:
int_fields_as_floats.append(field)
if 'datetime64' in new_dtype and new_dtype != 'datetime64[ns]' and live_type == 'esriFieldTypeDate':
datetime_fields_with_timezone.append(field)
except KeyError:
# pylint: disable-next=raise-missing-from
raise NotImplementedError(f'Live field "{field}" type "{live_type}" not yet mapped to a pandas dtype')
if invalid_fields:
if int_fields_as_floats:
raise IntFieldAsFloatError(
f'Field type incompatibilities (field, live type, new type): {invalid_fields}\n' \
'Check the following int fields for null/np.nan values and convert to panda\'s nullable int '\
f'dtype: {", ".join(int_fields_as_floats)}'
)
if datetime_fields_with_timezone:
raise TimezoneAwareDatetimeError(
f'Field type incompatibilities (field, live type, new type): {invalid_fields}\n' \
'Check the following datetime fields for timezone aware dtypes values and convert to '\
'timezone-naive dtypes using pd.to_datetime(df[\'field\']).dt.tz_localize(None): '\
f'{", ".join(datetime_fields_with_timezone)}'
)
raise ValueError(f'Field type incompatibilities (field, live type, new type): {invalid_fields}')
def _check_geometry_types(self):
"""Raise an error if the live and new data geometry types are incompatible.
Args:
live_data_properties (dict): FeatureLayer.properties of live data
new_dataframe (pd.DataFrame): New data to be added/updated
Raises:
ValueError: If the new data is not a valid spatially-enabled dataframe, has multiple geometry types, or has
a geometry type that doesn't match the live data.
"""
esri_to_sedf_geometry_mapping = {
'esriGeometryPoint': 'point',
'esriGeometryMultipoint': 'multipoint',
'esriGeometryPolyline': 'polyline',
'esriGeometryPolygon': 'polygon',
'esriGeometryEnvelope': 'envelope',
}
if 'SHAPE' not in self.new_dataframe.columns:
raise ValueError('New dataframe does not have a SHAPE column')
if self.new_dataframe['SHAPE'].isna().any():
raise ValueError(
f'New dataframe has missing geometries at index {list(self.new_dataframe[self.new_dataframe["SHAPE"].isna()].index)}'
)
live_geometry_type = self.live_data_properties.geometryType
new_geometry_types = self.new_dataframe.spatial.geometry_type
if len(new_geometry_types) > 1:
raise ValueError('New dataframe has multiple geometry types')
if esri_to_sedf_geometry_mapping[live_geometry_type] != new_geometry_types[0].lower():
raise ValueError(
f'New dataframe geometry type "{new_geometry_types[0]}" incompatible with live geometry type "{live_geometry_type}"'
)
def check_for_non_null_fields(self, fields):
"""Raise an error if the new data contains nulls in a field that the live data says is not nullable.
If this error occurs, the client should use pandas fillna() method to replace NaNs/Nones with empty strings or
appropriate nodata values.
Args:
fields (List[str]): Fields to check
Raises:
ValueError: If the new data contains nulls in a field that the live data says is not nullable and doesn't
have a default value.
"""
columns_with_nulls = self.new_dataframe.columns[self.new_dataframe.isna().any()].tolist()
# fields_dataframe = pd.DataFrame(self.live_data_properties['fields'])
non_nullable_live_columns = self.fields_dataframe[
~(self.fields_dataframe['nullable']) &
~(self.fields_dataframe['defaultValue'].astype(bool))]['name'].tolist()
columns_to_check = [column for column in columns_with_nulls if column in fields]
#: If none of the columns have nulls, we don't need to check further
if not columns_to_check:
return
problem_fields = []
for column in columns_to_check:
if column in non_nullable_live_columns:
problem_fields.append(column)
if problem_fields:
raise ValueError(
f'The following fields cannot have null values in the live data but one or more nulls exist in the new data: {", ".join(problem_fields)}'
)
def check_field_length(self, fields):
"""Raise an error if a new data string value is longer than allowed in the live data.
Args:
fields (List[str]): Fields to check
Raises:
ValueError: If the string fields in the new data contain a value longer than the corresponding field in the
live data allows.
"""
if 'length' not in self.fields_dataframe.columns:
module_logger.debug('No fields with length property')
return
length_limited_fields = self.fields_dataframe[
(self.fields_dataframe['type'].isin(['esriFieldTypeString', 'esriFieldTypeGlobalID'])) &
(self.fields_dataframe['length'].astype(bool))]
columns_to_check = length_limited_fields[length_limited_fields['name'].isin(fields)]
for field, live_max_length in columns_to_check[['name', 'length']].to_records(index=False):
new_data_lengths = self.new_dataframe[field].str.len()
new_max_length = new_data_lengths.max()
if new_max_length > live_max_length:
raise ValueError(
f'Row {new_data_lengths.argmax()}, column {field} in new data exceeds the live data max length of {live_max_length}'
)
def check_fields_present(self, fields, add_oid):
"""Raise an error if the fields to be operated on aren't present in either the live or new data.
Args:
fields (List[str]): The fields to be operated on.
add_oid (bool): Add OBJECTID to fields if its not already present (for operations that are dependent on
OBJECTID, such as upsert)
Raises:
RuntimeError: If any of fields are not in live or new data.
"""
live_fields = set(self.fields_dataframe['name'])
new_fields = set(self.new_dataframe.columns)
working_fields = set(fields)
working_fields.discard('SHAPE') #: The fields from the feature layer properties don't include the SHAPE field.
if add_oid:
working_fields.add('OBJECTID')
live_dif = working_fields - live_fields
new_dif = working_fields - new_fields
error_message = []
if live_dif:
error_message.append(f'Fields missing in live data: {", ".join(live_dif)}')
if new_dif:
error_message.append(f'Fields missing in new data: {", ".join(new_dif)}')
if error_message:
raise RuntimeError('. '.join(error_message))
def check_srs_wgs84(self):
"""Raise an error if the new spatial reference system isn't WGS84 as required by geojson.
Raises:
ValueError: If the new SRS value can't be cast to an int (please log an issue if this occurs)
ValueError: If the new SRS value isn't 4326.
"""
#: If we modify a spatial data frame, sometimes the .sr.wkid property/dictionary becomes {0:number} instead
#: of {'wkid': number}
try:
new_srs = self.new_dataframe.spatial.sr.wkid
except AttributeError:
new_srs = self.new_dataframe.spatial.sr[0]
try:
new_srs = int(new_srs)
except ValueError as error:
raise ValueError('Could not cast new SRS to int') from error
if new_srs != 4326:
raise ValueError(
f'New dataframe SRS {new_srs} is not wkid 4326. Reproject with appropriate transformation.'
)
def check_nullable_ints_shapely(self):
"""Raise a warning if null values occur within nullable integer fields of the dataframe
Apparently due to a convention within shapely, any null values in an integer field are converted to 0.
Raises:
UserWarning: If we're using shapely instead of arcpy, the new dataframe uses nullable int dtypes, and there
is one or more pd.NA values within a nullable int column.
"""
#: Only occurs if client is using shapely instead of arcpy
if importlib.util.find_spec('arcpy'):
return
nullable_ints = {'Int8', 'Int16', 'Int32', 'Int64', 'UInt8', 'UInt16', 'UInt32', 'UInt64'}
nullable_int_columns = [
column for column in self.new_dataframe.columns if str(self.new_dataframe[column].dtype) in nullable_ints
]
columns_with_nulls = [column for column in nullable_int_columns if self.new_dataframe[column].isnull().any()]
if columns_with_nulls:
warnings.warn(
'The following columns have null values that will be replaced by 0 due to shapely conventions: '\
f'{", ".join(columns_with_nulls)}'
)
def get_null_geometries(feature_layer_properties):
"""Generate placeholder geometries near 0, 0 with type based on provided feature layer properties dictionary.
Args:
feature_layer_properties (dict): .properties from a feature layer item, contains 'geometryType' key
Raises:
NotImplementedError: If we get a geometryType we haven't implemented a null-geometry generator for
Returns:
arcgis.geometry.Geometry: A geometry object of the corresponding type centered around null island.
"""
# esri_to_sedf_geometry_mapping = {
# 'esriGeometryPoint': 'point',
# 'esriGeometryMultipoint': 'multipoint',
# 'esriGeometryPolyline': 'polyline',
# 'esriGeometryPolygon': 'polygon',
# 'esriGeometryEnvelope': 'envelope',
# }
live_geometry_type = feature_layer_properties.geometryType
if live_geometry_type == 'esriGeometryPoint':
return arcgis.geometry.Point({'x': 0, 'y': 0, 'spatialReference': {'wkid': 4326}}).JSON
if live_geometry_type == 'esriGeometryPolyline':
return arcgis.geometry.Polyline({
'paths': [[[0, 0], [.1, .1], [.2, .2]]],
'spatialReference': {
'wkid': 4326
}
}).JSON
if live_geometry_type == 'esriGeometryPolygon':
return arcgis.geometry.Polygon({
'rings': [[[0, .1], [.1, .1], [.1, 0], [0, 0]]],
'spatialReference': {
'wkid': 4326
}
}).JSON
raise NotImplementedError(f'Null value generator for live geometry type {live_geometry_type} not yet implemented')
class DeleteUtils:
"""Verify Object IDs used for delete operations
"""
@staticmethod
def check_delete_oids_are_ints(oid_list):
"""Raise an error if a list of strings can't be parsed as ints
Args:
oid_list (list[int]): List of Object IDs to delete
Raises:
TypeError: If any of the items in oid_list can't be cast to ints
Returns:
list[int]: oid_list converted to ints
"""
numeric_oids = []
bad_oids = []
for oid in oid_list:
try:
numeric_oids.append(int(oid))
except ValueError:
bad_oids.append(oid)
if bad_oids:
raise TypeError(f'Couldn\'t convert OBJECTID(s) `{bad_oids}` to integer')
return numeric_oids
@staticmethod
def check_for_empty_oid_list(oid_list, numeric_oids):
"""Raise an error if the parsed Object ID list is empty
Args:
oid_list (list[int]): The original list of Object IDs to delete
numeric_oids (list[int]): The cast-to-int Object IDs
Raises:
ValueError: If numeric_oids is empty
"""
if not numeric_oids:
raise ValueError(f'No OBJECTIDs found in {oid_list}')
@staticmethod
def check_delete_oids_are_in_live_data(oid_string, numeric_oids, feature_layer):
"""Warn if a delete Object ID doesn't exist in the live data, return number missing
Args:
oid_string (str): Comma-separated string of delete Object IDs
numeric_oids (list[int]): The parsed and cast-to-int Object IDs
feature_layer (arcgis.features.FeatureLayer): Live FeatureLayer item
Raises:
UserWarning: If any of the Object IDs in numeric_oids don't exist in the live data.
Returns:
int: Number of Object IDs missing from live data
"""
query_results = feature_layer.query(object_ids=oid_string, return_ids_only=True)
query_oids = query_results['objectIds']
oids_not_in_layer = set(numeric_oids) - set(query_oids)
if oids_not_in_layer:
warnings.warn(f'OBJECTIDs {oids_not_in_layer} were not found in the live data')
return len(oids_not_in_layer)
class Chunking:
"""Divide a dataframe into chunks to satisfy upload size requirements for append operation.
"""
@staticmethod
def _ceildiv(num, denom):
"""Perform ceiling division: 5/4 = 2
Args:
num (int or float): Numerator
denom (int or float): Denominator
Returns:
int: Ceiling divisor
"""
return -(num // -denom)
@staticmethod
def _chunk_dataframe(dataframe, chunk_size):
"""Divide up a dataframe into a list of dataframes of chunk_size rows
The DataFrames are returned in a list. Elements [:-1] are as large as possible for the number of chunks needed,
while the last gets however many rows of the dataframe are left over. eg, a 10-row dataframe broken into 3
chunks would result in dataframes with 3, 3, and 1 rows.
Args:
dataframe (pd.DataFrame): Input DataFrame
chunk_size (int): The max number of rows for each sub dataframe
Raises:
ValueError: If the dataframe has only a single row and thus can't be chunked smaller
Returns:
list[pd.DataFrame]: A list of dataframes with at most chunk_size rows per dataframe
"""
df_length = len(dataframe)
if df_length == 1:
raise ValueError(
f'Dataframe chunk is only one row (index {dataframe.index[0]}), further chunking impossible'
)
starts = range(0, df_length, chunk_size)
ends = [start + chunk_size if start + chunk_size < df_length else df_length for start in starts]
list_of_dataframes = [dataframe.iloc[start:end] for start, end in zip(starts, ends)]
return list_of_dataframes
@staticmethod
def build_upload_json(dataframe, feature_layer_fields, max_bytes=100_000_000):
"""Create list of geojson strings of spatially-enabled DataFrame, divided into chunks if it exceeds max_bytes
Recursively chunks dataframe to ensure no one chunk is larger than max_bytes. Converts all empty strings in
nullable numeric fields in feature sets created from individual chunks to None prior to converting to geojson to
ensure the field stays numeric.
Args:
dataframe (pd.DataFrame.spatial): Spatially-enabled dataframe to be converted to geojson
feature_layer_fields: All the fields from the feature layer (feature_layer.properties.fields)
max_bytes (int, optional): Maximum size in bytes any one geojson string can be. Defaults to 100000000 (AGOL
text uploads are limited to 100 MB?)
Returns:
list[str]: A list of the dataframe chunks converted to geojson
"""
geojson_size = sys.getsizeof(dataframe.spatial.to_featureset().to_geojson.encode('utf-16'))
module_logger.debug('Initial file size: %s', geojson_size)
chunked_dataframes = Chunking._recursive_dataframe_chunking(dataframe, max_bytes)
chunked_geojsons = [
fix_numeric_empty_strings(chunk.spatial.to_featureset(), feature_layer_fields).to_geojson
for chunk in chunked_dataframes
]
return chunked_geojsons
@staticmethod
def _recursive_dataframe_chunking(dataframe, max_bytes):
"""Break a dataframe into chunks such that their utf-16 encoded geojson sizes don't exceed max_bytes
Divides the dataframe into chunks based on the geojson representation's utf-16-encoded size by calculating the
number of chunks of size > max_bytes needed for the entire file size. It uses this number of chunks to chunk the
dataframe based on rows. Because there can be variability in geojson file size due to attribute lengths
(especially line and polygon geometry sizes), it uses recursion to again chunk each smaller dataframe if needed.
The chunks should (but not definitely proven to) maintain the sequential order of the features of the original
dataframe. Suppose an initial 10 rows gives us chunks for rows [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]. However,
the second chunk [4, 5, 6] turns out to be too large, so it gets divided into [4, 5] and [6]. The resulting
list of chunks should be [1, 2, 3], [4, 5], [6], [7, 8, 9], [10].
The chunking process will raise an error if it tries to chunk a dataframe with only one row, which means a
single row is larger than max_bytes (usually caused by a large and complex geometry).
Args:
dataframe (pd.DataFrame.spatial): A spatially-enabled dataframe to divide
max_bytes (int): The max utf-16 encoded geojson size for any one chunk
"""
#: Calculate number of chunks needed and the guesstimate max number of rows to achieve that size
geojson_size = sys.getsizeof(dataframe.spatial.to_featureset().to_geojson.encode('utf-16'))
chunks_needed = Chunking._ceildiv(geojson_size, max_bytes)
max_rows = Chunking._ceildiv(len(dataframe), chunks_needed)
#: Chunk the dataframe and then check if the resulting chunks are now within the proper size, calling again on
#: the offending chunks if not
list_of_dataframes = Chunking._chunk_dataframe(dataframe, max_rows)
return_dataframes = [] #: Holds result of valid and recursive chunks
for chunk_dataframe in list_of_dataframes:
chunk_geojson_size = sys.getsizeof(chunk_dataframe.spatial.to_featureset().to_geojson.encode('utf-16'))
if chunk_geojson_size > max_bytes:
return_dataframes.extend(Chunking._recursive_dataframe_chunking(chunk_dataframe, max_bytes))
else:
return_dataframes.append(chunk_dataframe)
return return_dataframes
def fix_numeric_empty_strings(feature_set, feature_layer_fields):
"""Replace empty strings with None for numeric fields that allow nulls
Args:
feature_set (arcgis.features.FeatureSet): Feature set to clean
fields (Dict): fields from feature layer
"""
fields_to_fix = {
field['name']
for field in feature_layer_fields
if field['type'] in ['esriFieldTypeDouble', 'esriFieldTypeInteger', 'esriFieldTypeDate'] and field['nullable']
}
fields_to_fix -= {'Shape__Length', 'Shape__Area'}
for feature in feature_set.features:
for field_name in fields_to_fix:
if feature.attributes[field_name] == '':
feature.attributes[field_name] = None
return feature_set
def chunker(sequence, chunk_size):
"""Break sequence into chunk_size chunks
Args:
sequence (iterable): Any iterable sequence
chunk_size (int): Desired number of elements in each chunk
Returns:
generator: Generator of original sequence broken into chunk_size lists
"""
return (sequence[position:position + chunk_size] for position in range(0, len(sequence), chunk_size))
Functions
-
Authenticate pygsheets using either a service file or google.auth.credentials.Credentials object.
Requires either the path to a service account .json file that has access to the files in question or a
google. auth.credentials.Credentials<code> object. Calling </code>google.auth.default()
in a Google Cloud Function will give you a tuple of aCredentials
object and the project id. You can use thisCredentials
object to authorize pygsheets as the same account the Cloud Function is running under.Tries first to load credentials from file; if this fails tries credentials directly as a custom_credential.
Args
credentials
:str
orgoogle.auth.credentials.Credentials
- Path to the service file OR credentials object obtained from google.auth.default() within a cloud function.
Raises
RuntimeError
- If both authorization method attempts fail
Returns
pygsheets.Client
- Authorized pygsheets client
Expand source code
def authorize_pygsheets(credentials): """Authenticate pygsheets using either a service file or google.auth.credentials.Credentials object. Requires either the path to a service account .json file that has access to the files in question or a `google. auth.credentials.Credentials` object. Calling `google.auth.default()` in a Google Cloud Function will give you a tuple of a `Credentials` object and the project id. You can use this `Credentials` object to authorize pygsheets as the same account the Cloud Function is running under. Tries first to load credentials from file; if this fails tries credentials directly as a custom_credential. Args: credentials (str or google.auth.credentials.Credentials): Path to the service file OR credentials object obtained from google.auth.default() within a cloud function. Raises: RuntimeError: If both authorization method attempts fail Returns: pygsheets.Client: Authorized pygsheets client """ try: return pygsheets.authorize(service_file=credentials) except (FileNotFoundError, TypeError) as err: module_logger.debug(err) module_logger.debug('Credentials file not found, trying as environment variable') try: return pygsheets.authorize(custom_credentials=credentials) except Exception as err: raise RuntimeError('Could not authenticate to Google API') from err
def build_sql_in_list(series)
-
Generate a properly formatted list to be a target for a SQL 'IN' clause
Args
series
:pd.Series
- Series of values to be included in the 'IN' list
Returns
str
- Values formatted as (1, 2, 3) for numbers or ('a', 'b', 'c') for anything else
Expand source code
def build_sql_in_list(series): """Generate a properly formatted list to be a target for a SQL 'IN' clause Args: series (pd.Series): Series of values to be included in the 'IN' list Returns: str: Values formatted as (1, 2, 3) for numbers or ('a', 'b', 'c') for anything else """ if pd.api.types.is_numeric_dtype(series): return f'({", ".join(series.astype(str))})' else: quoted_values = [f"'{value}'" for value in series] return f'({", ".join(quoted_values)})'
def calc_modulus_for_reporting_interval(n, split_value=500)
-
Calculate a number that can be used as a modulus for splitting n up into 10 or 20 intervals, depending on split_value.
Args
n
:int
- The number to divide into intervals
split_value
:int
, optional- The point at which it should create 20 intervals instead of 10. Defaults to 500.
Returns
int
- Number to be used as modulus to compare to 0 in reporting code
Expand source code
def calc_modulus_for_reporting_interval(n, split_value=500): """Calculate a number that can be used as a modulus for splitting n up into 10 or 20 intervals, depending on split_value. Args: n (int): The number to divide into intervals split_value (int, optional): The point at which it should create 20 intervals instead of 10. Defaults to 500. Returns: int: Number to be used as modulus to compare to 0 in reporting code """ if n <= 10: return 1 if n < split_value: return floor(n / 10) return floor(n / 20)
def check_field_set_to_unique(featurelayer, field_name)
-
Makes sure field_name has a "unique constraint" in AGOL, which allows it to be used for .append upserts
Args
featurelayer
:arcgis.features.FeatureLayer
- The target feature layer
field_name
:str
- The AGOL-valid field name to check
Raises
RuntimeError
- If the field is not unique (or if it's indexed but not unique)
Expand source code
def check_field_set_to_unique(featurelayer, field_name): """Makes sure field_name has a "unique constraint" in AGOL, which allows it to be used for .append upserts Args: featurelayer (arcgis.features.FeatureLayer): The target feature layer field_name (str): The AGOL-valid field name to check Raises: RuntimeError: If the field is not unique (or if it's indexed but not unique) """ fields = [field['fields'] for field in featurelayer.properties.indexes] if field_name not in fields: raise RuntimeError(f'{field_name} does not have a "unique constraint" set within the feature layer') for field in featurelayer.properties.indexes: if field['fields'] == field_name: if not field['isUnique']: raise RuntimeError(f'{field_name} does not have a "unique constraint" set within the feature layer')
def check_fields_match(featurelayer, new_dataframe)
-
Make sure new data doesn't have any extra fields, warn if it doesn't contain all live fields
Args
featurelayer
:arcgis.features.FeatureLayer
- Live data
new_dataframe
:pd.DataFrame
- New data
Raises
RuntimeError
- If new data contains a field not present in the live data
Expand source code
def check_fields_match(featurelayer, new_dataframe): """Make sure new data doesn't have any extra fields, warn if it doesn't contain all live fields Args: featurelayer (arcgis.features.FeatureLayer): Live data new_dataframe (pd.DataFrame): New data Raises: RuntimeError: If new data contains a field not present in the live data """ live_fields = {field['name'] for field in featurelayer.properties.fields} new_fields = set(new_dataframe.columns) #: Remove SHAPE field from set (live "featurelayer.properties['fields']" does not expose the 'SHAPE' field) try: new_fields.remove('SHAPE') except KeyError: pass new_dif = new_fields - live_fields live_dif = live_fields - new_fields if new_dif: raise RuntimeError( f'New dataset contains the following fields that are not present in the live dataset: {new_dif}' ) if live_dif: module_logger.warning( 'New dataset does not contain the following fields that are present in the live dataset: %s', live_dif )
def check_index_column_in_feature_layer(featurelayer, index_column)
-
Ensure index_column is present for any future operations
Args
featurelayer
:arcgis.features.FeatureLayer
- The live feature layer
index_column
:str
- The index column meant to link new and live data
Raises
RuntimeError
- If index_column is not in featurelayer's fields
Expand source code
def check_index_column_in_feature_layer(featurelayer, index_column): """Ensure index_column is present for any future operations Args: featurelayer (arcgis.features.FeatureLayer): The live feature layer index_column (str): The index column meant to link new and live data Raises: RuntimeError: If index_column is not in featurelayer's fields """ featurelayer_fields = [field['name'] for field in featurelayer.properties.fields] if index_column not in featurelayer_fields: raise RuntimeError(f'Index column {index_column} not found in feature layer fields {featurelayer_fields}')
def chunker(sequence, chunk_size)
-
Break sequence into chunk_size chunks
Args
sequence
:iterable
- Any iterable sequence
chunk_size
:int
- Desired number of elements in each chunk
Returns
generator
- Generator of original sequence broken into chunk_size lists
Expand source code
def chunker(sequence, chunk_size): """Break sequence into chunk_size chunks Args: sequence (iterable): Any iterable sequence chunk_size (int): Desired number of elements in each chunk Returns: generator: Generator of original sequence broken into chunk_size lists """ return (sequence[position:position + chunk_size] for position in range(0, len(sequence), chunk_size))
def fix_numeric_empty_strings(feature_set, feature_layer_fields)
-
Replace empty strings with None for numeric fields that allow nulls
Args
feature_set
:arcgis.features.FeatureSet
- Feature set to clean
fields
:Dict
- fields from feature layer
Expand source code
def fix_numeric_empty_strings(feature_set, feature_layer_fields): """Replace empty strings with None for numeric fields that allow nulls Args: feature_set (arcgis.features.FeatureSet): Feature set to clean fields (Dict): fields from feature layer """ fields_to_fix = { field['name'] for field in feature_layer_fields if field['type'] in ['esriFieldTypeDouble', 'esriFieldTypeInteger', 'esriFieldTypeDate'] and field['nullable'] } fields_to_fix -= {'Shape__Length', 'Shape__Area'} for feature in feature_set.features: for field_name in fields_to_fix: if feature.attributes[field_name] == '': feature.attributes[field_name] = None return feature_set
def get_null_geometries(feature_layer_properties)
-
Generate placeholder geometries near 0, 0 with type based on provided feature layer properties dictionary.
Args
feature_layer_properties
:dict
- .properties from a feature layer item, contains 'geometryType' key
Raises
NotImplementedError
- If we get a geometryType we haven't implemented a null-geometry generator for
Returns
arcgis.geometry.Geometry
- A geometry object of the corresponding type centered around null island.
Expand source code
def get_null_geometries(feature_layer_properties): """Generate placeholder geometries near 0, 0 with type based on provided feature layer properties dictionary. Args: feature_layer_properties (dict): .properties from a feature layer item, contains 'geometryType' key Raises: NotImplementedError: If we get a geometryType we haven't implemented a null-geometry generator for Returns: arcgis.geometry.Geometry: A geometry object of the corresponding type centered around null island. """ # esri_to_sedf_geometry_mapping = { # 'esriGeometryPoint': 'point', # 'esriGeometryMultipoint': 'multipoint', # 'esriGeometryPolyline': 'polyline', # 'esriGeometryPolygon': 'polygon', # 'esriGeometryEnvelope': 'envelope', # } live_geometry_type = feature_layer_properties.geometryType if live_geometry_type == 'esriGeometryPoint': return arcgis.geometry.Point({'x': 0, 'y': 0, 'spatialReference': {'wkid': 4326}}).JSON if live_geometry_type == 'esriGeometryPolyline': return arcgis.geometry.Polyline({ 'paths': [[[0, 0], [.1, .1], [.2, .2]]], 'spatialReference': { 'wkid': 4326 } }).JSON if live_geometry_type == 'esriGeometryPolygon': return arcgis.geometry.Polygon({ 'rings': [[[0, .1], [.1, .1], [.1, 0], [0, 0]]], 'spatialReference': { 'wkid': 4326 } }).JSON raise NotImplementedError(f'Null value generator for live geometry type {live_geometry_type} not yet implemented')
def rename_columns_for_agol(columns)
-
Replace special characters and spaces with '_' to match AGOL field names
Args
columns
:iter
- The new columns to be renamed
Returns
Dict
- Mapping {'original name': 'cleaned_name'}
Expand source code
def rename_columns_for_agol(columns): """Replace special characters and spaces with '_' to match AGOL field names Args: columns (iter): The new columns to be renamed Returns: Dict: Mapping {'original name': 'cleaned_name'} """ rename_dict = {} for column in columns: no_specials = re.sub(r'[^a-zA-Z0-9_]', '_', column) match = re.match(r'(^[0-9_]+)', no_specials) if match: number = match.groups()[0] rename_dict[column] = no_specials.removeprefix(number) + number continue rename_dict[column] = no_specials return rename_dict
def rename_fields(dataframe, field_mapping)
-
Rename fields based on field_mapping
Args
dataframe
:pd.DataFrame
- Dataframe with columns to be renamed
field_mapping
:dict
- Mapping of existing field names to new names
Raises
ValueError
- If an existing name from field_mapping is not found in dataframe.columns
Returns
pd.DataFrame
- Dataframe with renamed fields
Expand source code
def rename_fields(dataframe, field_mapping): """Rename fields based on field_mapping Args: dataframe (pd.DataFrame): Dataframe with columns to be renamed field_mapping (dict): Mapping of existing field names to new names Raises: ValueError: If an existing name from field_mapping is not found in dataframe.columns Returns: pd.DataFrame: Dataframe with renamed fields """ for original_name in field_mapping.keys(): if original_name not in dataframe.columns: raise ValueError(f'Field {original_name} not found in dataframe.') renamed_df = dataframe.rename(columns=field_mapping) return renamed_df
def retry(worker_method, *args, **kwargs)
-
Allows you to retry a function/method to overcome network jitters or other transient errors.
Retries worker_method RETRY_MAX_TRIES times (for a total of n+1 tries, including the initial attempt), pausing 2^RETRY_DELAY_TIME seconds between each retry. Any arguments for worker_method can be passed in as additional parameters to retry() following worker_method: retry(foo_method, arg1, arg2, keyword_arg=3).
RETRY_MAX_TRIES and RETRY_DELAY_TIME default to 3 tries and 2 seconds, but can be overridden by setting the palletjack.utils.RETRY_MAX_TRIES and palletjack.utils.RETRY_DELAY_TIME constants in the client script.
Args
worker_method
:callable
- The name of the method to be retried (minus the calling parens)
Raises
error
- The final error that causes worker_method to fail after 3 retries
Returns
various
- The value(s) returned by worked_method
Expand source code
def retry(worker_method, *args, **kwargs): """Allows you to retry a function/method to overcome network jitters or other transient errors. Retries worker_method RETRY_MAX_TRIES times (for a total of n+1 tries, including the initial attempt), pausing 2^RETRY_DELAY_TIME seconds between each retry. Any arguments for worker_method can be passed in as additional parameters to retry() following worker_method: retry(foo_method, arg1, arg2, keyword_arg=3). RETRY_MAX_TRIES and RETRY_DELAY_TIME default to 3 tries and 2 seconds, but can be overridden by setting the palletjack.utils.RETRY_MAX_TRIES and palletjack.utils.RETRY_DELAY_TIME constants in the client script. Args: worker_method (callable): The name of the method to be retried (minus the calling parens) Raises: error: The final error that causes worker_method to fail after 3 retries Returns: various: The value(s) returned by worked_method """ tries = 1 max_tries = RETRY_MAX_TRIES delay = RETRY_DELAY_TIME #: in seconds #: this inner function (closure? almost-closure?) allows us to keep track of tries without passing it as an arg def _inner_retry(worker_method, *args, **kwargs): nonlocal tries try: return worker_method(*args, **kwargs) #: ArcGIS API for Python loves throwing bog-standard Exceptions, so we can't narrow this down further except Exception as error: if tries <= max_tries: #pylint: disable=no-else-return wait_time = delay**tries module_logger.debug( 'Exception "%s" thrown on "%s". Retrying after %s seconds...', error, worker_method, wait_time ) sleep(wait_time) tries += 1 return _inner_retry(worker_method, *args, **kwargs) else: raise error return _inner_retry(worker_method, *args, **kwargs)
def save_feature_layer_to_gdb(feature_layer, directory)
-
Save a feature_layer to a gdb for safety as backup.gdb/{layer name}_{todays date}
Args
feature_layer
:arcgis.features.FeatureLayer
- The FeatureLayer object to save to disk.
directory
:str
orPath
- The directory to save the data to.
Returns
Path
- The full path to the output file, named with the layer name and today's date.
Expand source code
def save_feature_layer_to_gdb(feature_layer, directory): """Save a feature_layer to a gdb for safety as backup.gdb/{layer name}_{todays date} Args: feature_layer (arcgis.features.FeatureLayer): The FeatureLayer object to save to disk. directory (str or Path): The directory to save the data to. Returns: Path: The full path to the output file, named with the layer name and today's date. """ module_logger.debug('Downloading existing data...') dataframe = feature_layer.query().sdf if dataframe.empty: return f'No data to save in feature layer {feature_layer.properties.name}' gdf = sedf_to_gdf(dataframe) out_path = Path(directory, 'backup.gdb') out_layer = f'{feature_layer.properties.name}_{datetime.date.today().strftime("%Y_%m_%d")}' module_logger.debug('Saving existing data to %s', out_path) try: gdf.to_file(out_path, layer=out_layer, engine='pyogrio', driver='OpenFileGDB') except pyogrio.errors.DataSourceError as error: raise ValueError( f'Error writing {out_layer} to {out_path}. Verify {Path(directory)} exists and is writable.' ) from error return out_path
def sedf_to_gdf(dataframe)
-
Convert an Esri Spatially Enabled DataFrame to a GeoPandas GeoDataFrame
Args
dataframe
:pd.DataFrame.spatial
- Esri spatially enabled dataframe to convert
Returns
GeoPandas.DataFrame
- dataframe converted to GeoDataFrame
Expand source code
def sedf_to_gdf(dataframe): """Convert an Esri Spatially Enabled DataFrame to a GeoPandas GeoDataFrame Args: dataframe (pd.DataFrame.spatial): Esri spatially enabled dataframe to convert Returns: GeoPandas.DataFrame: dataframe converted to GeoDataFrame """ gdf = gpd.GeoDataFrame(dataframe, geometry=dataframe.spatial.name) try: gdf.set_crs(dataframe.spatial.sr['latestWkid'], inplace=True) except KeyError: gdf.set_crs(dataframe.spatial.sr['wkid'], inplace=True) return gdf
Classes
class Chunking
-
Divide a dataframe into chunks to satisfy upload size requirements for append operation.
Expand source code
class Chunking: """Divide a dataframe into chunks to satisfy upload size requirements for append operation. """ @staticmethod def _ceildiv(num, denom): """Perform ceiling division: 5/4 = 2 Args: num (int or float): Numerator denom (int or float): Denominator Returns: int: Ceiling divisor """ return -(num // -denom) @staticmethod def _chunk_dataframe(dataframe, chunk_size): """Divide up a dataframe into a list of dataframes of chunk_size rows The DataFrames are returned in a list. Elements [:-1] are as large as possible for the number of chunks needed, while the last gets however many rows of the dataframe are left over. eg, a 10-row dataframe broken into 3 chunks would result in dataframes with 3, 3, and 1 rows. Args: dataframe (pd.DataFrame): Input DataFrame chunk_size (int): The max number of rows for each sub dataframe Raises: ValueError: If the dataframe has only a single row and thus can't be chunked smaller Returns: list[pd.DataFrame]: A list of dataframes with at most chunk_size rows per dataframe """ df_length = len(dataframe) if df_length == 1: raise ValueError( f'Dataframe chunk is only one row (index {dataframe.index[0]}), further chunking impossible' ) starts = range(0, df_length, chunk_size) ends = [start + chunk_size if start + chunk_size < df_length else df_length for start in starts] list_of_dataframes = [dataframe.iloc[start:end] for start, end in zip(starts, ends)] return list_of_dataframes @staticmethod def build_upload_json(dataframe, feature_layer_fields, max_bytes=100_000_000): """Create list of geojson strings of spatially-enabled DataFrame, divided into chunks if it exceeds max_bytes Recursively chunks dataframe to ensure no one chunk is larger than max_bytes. Converts all empty strings in nullable numeric fields in feature sets created from individual chunks to None prior to converting to geojson to ensure the field stays numeric. Args: dataframe (pd.DataFrame.spatial): Spatially-enabled dataframe to be converted to geojson feature_layer_fields: All the fields from the feature layer (feature_layer.properties.fields) max_bytes (int, optional): Maximum size in bytes any one geojson string can be. Defaults to 100000000 (AGOL text uploads are limited to 100 MB?) Returns: list[str]: A list of the dataframe chunks converted to geojson """ geojson_size = sys.getsizeof(dataframe.spatial.to_featureset().to_geojson.encode('utf-16')) module_logger.debug('Initial file size: %s', geojson_size) chunked_dataframes = Chunking._recursive_dataframe_chunking(dataframe, max_bytes) chunked_geojsons = [ fix_numeric_empty_strings(chunk.spatial.to_featureset(), feature_layer_fields).to_geojson for chunk in chunked_dataframes ] return chunked_geojsons @staticmethod def _recursive_dataframe_chunking(dataframe, max_bytes): """Break a dataframe into chunks such that their utf-16 encoded geojson sizes don't exceed max_bytes Divides the dataframe into chunks based on the geojson representation's utf-16-encoded size by calculating the number of chunks of size > max_bytes needed for the entire file size. It uses this number of chunks to chunk the dataframe based on rows. Because there can be variability in geojson file size due to attribute lengths (especially line and polygon geometry sizes), it uses recursion to again chunk each smaller dataframe if needed. The chunks should (but not definitely proven to) maintain the sequential order of the features of the original dataframe. Suppose an initial 10 rows gives us chunks for rows [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]. However, the second chunk [4, 5, 6] turns out to be too large, so it gets divided into [4, 5] and [6]. The resulting list of chunks should be [1, 2, 3], [4, 5], [6], [7, 8, 9], [10]. The chunking process will raise an error if it tries to chunk a dataframe with only one row, which means a single row is larger than max_bytes (usually caused by a large and complex geometry). Args: dataframe (pd.DataFrame.spatial): A spatially-enabled dataframe to divide max_bytes (int): The max utf-16 encoded geojson size for any one chunk """ #: Calculate number of chunks needed and the guesstimate max number of rows to achieve that size geojson_size = sys.getsizeof(dataframe.spatial.to_featureset().to_geojson.encode('utf-16')) chunks_needed = Chunking._ceildiv(geojson_size, max_bytes) max_rows = Chunking._ceildiv(len(dataframe), chunks_needed) #: Chunk the dataframe and then check if the resulting chunks are now within the proper size, calling again on #: the offending chunks if not list_of_dataframes = Chunking._chunk_dataframe(dataframe, max_rows) return_dataframes = [] #: Holds result of valid and recursive chunks for chunk_dataframe in list_of_dataframes: chunk_geojson_size = sys.getsizeof(chunk_dataframe.spatial.to_featureset().to_geojson.encode('utf-16')) if chunk_geojson_size > max_bytes: return_dataframes.extend(Chunking._recursive_dataframe_chunking(chunk_dataframe, max_bytes)) else: return_dataframes.append(chunk_dataframe) return return_dataframes
Static methods
def build_upload_json(dataframe, feature_layer_fields, max_bytes=100000000)
-
Create list of geojson strings of spatially-enabled DataFrame, divided into chunks if it exceeds max_bytes
Recursively chunks dataframe to ensure no one chunk is larger than max_bytes. Converts all empty strings in nullable numeric fields in feature sets created from individual chunks to None prior to converting to geojson to ensure the field stays numeric.
Args
dataframe
:pd.DataFrame.spatial
- Spatially-enabled dataframe to be converted to geojson
feature_layer_fields
- All the fields from the feature layer (feature_layer.properties.fields)
max_bytes
:int
, optional- Maximum size in bytes any one geojson string can be. Defaults to 100000000 (AGOL text uploads are limited to 100 MB?)
Returns
list[str]
- A list of the dataframe chunks converted to geojson
Expand source code
@staticmethod def build_upload_json(dataframe, feature_layer_fields, max_bytes=100_000_000): """Create list of geojson strings of spatially-enabled DataFrame, divided into chunks if it exceeds max_bytes Recursively chunks dataframe to ensure no one chunk is larger than max_bytes. Converts all empty strings in nullable numeric fields in feature sets created from individual chunks to None prior to converting to geojson to ensure the field stays numeric. Args: dataframe (pd.DataFrame.spatial): Spatially-enabled dataframe to be converted to geojson feature_layer_fields: All the fields from the feature layer (feature_layer.properties.fields) max_bytes (int, optional): Maximum size in bytes any one geojson string can be. Defaults to 100000000 (AGOL text uploads are limited to 100 MB?) Returns: list[str]: A list of the dataframe chunks converted to geojson """ geojson_size = sys.getsizeof(dataframe.spatial.to_featureset().to_geojson.encode('utf-16')) module_logger.debug('Initial file size: %s', geojson_size) chunked_dataframes = Chunking._recursive_dataframe_chunking(dataframe, max_bytes) chunked_geojsons = [ fix_numeric_empty_strings(chunk.spatial.to_featureset(), feature_layer_fields).to_geojson for chunk in chunked_dataframes ] return chunked_geojsons
class DeleteUtils
-
Verify Object IDs used for delete operations
Expand source code
class DeleteUtils: """Verify Object IDs used for delete operations """ @staticmethod def check_delete_oids_are_ints(oid_list): """Raise an error if a list of strings can't be parsed as ints Args: oid_list (list[int]): List of Object IDs to delete Raises: TypeError: If any of the items in oid_list can't be cast to ints Returns: list[int]: oid_list converted to ints """ numeric_oids = [] bad_oids = [] for oid in oid_list: try: numeric_oids.append(int(oid)) except ValueError: bad_oids.append(oid) if bad_oids: raise TypeError(f'Couldn\'t convert OBJECTID(s) `{bad_oids}` to integer') return numeric_oids @staticmethod def check_for_empty_oid_list(oid_list, numeric_oids): """Raise an error if the parsed Object ID list is empty Args: oid_list (list[int]): The original list of Object IDs to delete numeric_oids (list[int]): The cast-to-int Object IDs Raises: ValueError: If numeric_oids is empty """ if not numeric_oids: raise ValueError(f'No OBJECTIDs found in {oid_list}') @staticmethod def check_delete_oids_are_in_live_data(oid_string, numeric_oids, feature_layer): """Warn if a delete Object ID doesn't exist in the live data, return number missing Args: oid_string (str): Comma-separated string of delete Object IDs numeric_oids (list[int]): The parsed and cast-to-int Object IDs feature_layer (arcgis.features.FeatureLayer): Live FeatureLayer item Raises: UserWarning: If any of the Object IDs in numeric_oids don't exist in the live data. Returns: int: Number of Object IDs missing from live data """ query_results = feature_layer.query(object_ids=oid_string, return_ids_only=True) query_oids = query_results['objectIds'] oids_not_in_layer = set(numeric_oids) - set(query_oids) if oids_not_in_layer: warnings.warn(f'OBJECTIDs {oids_not_in_layer} were not found in the live data') return len(oids_not_in_layer)
Static methods
def check_delete_oids_are_in_live_data(oid_string, numeric_oids, feature_layer)
-
Warn if a delete Object ID doesn't exist in the live data, return number missing
Args
oid_string
:str
- Comma-separated string of delete Object IDs
numeric_oids
:list[int]
- The parsed and cast-to-int Object IDs
feature_layer
:arcgis.features.FeatureLayer
- Live FeatureLayer item
Raises
UserWarning
- If any of the Object IDs in numeric_oids don't exist in the live data.
Returns
int
- Number of Object IDs missing from live data
Expand source code
@staticmethod def check_delete_oids_are_in_live_data(oid_string, numeric_oids, feature_layer): """Warn if a delete Object ID doesn't exist in the live data, return number missing Args: oid_string (str): Comma-separated string of delete Object IDs numeric_oids (list[int]): The parsed and cast-to-int Object IDs feature_layer (arcgis.features.FeatureLayer): Live FeatureLayer item Raises: UserWarning: If any of the Object IDs in numeric_oids don't exist in the live data. Returns: int: Number of Object IDs missing from live data """ query_results = feature_layer.query(object_ids=oid_string, return_ids_only=True) query_oids = query_results['objectIds'] oids_not_in_layer = set(numeric_oids) - set(query_oids) if oids_not_in_layer: warnings.warn(f'OBJECTIDs {oids_not_in_layer} were not found in the live data') return len(oids_not_in_layer)
def check_delete_oids_are_ints(oid_list)
-
Raise an error if a list of strings can't be parsed as ints
Args
oid_list
:list[int]
- List of Object IDs to delete
Raises
TypeError
- If any of the items in oid_list can't be cast to ints
Returns
list[int]
- oid_list converted to ints
Expand source code
@staticmethod def check_delete_oids_are_ints(oid_list): """Raise an error if a list of strings can't be parsed as ints Args: oid_list (list[int]): List of Object IDs to delete Raises: TypeError: If any of the items in oid_list can't be cast to ints Returns: list[int]: oid_list converted to ints """ numeric_oids = [] bad_oids = [] for oid in oid_list: try: numeric_oids.append(int(oid)) except ValueError: bad_oids.append(oid) if bad_oids: raise TypeError(f'Couldn\'t convert OBJECTID(s) `{bad_oids}` to integer') return numeric_oids
def check_for_empty_oid_list(oid_list, numeric_oids)
-
Raise an error if the parsed Object ID list is empty
Args
oid_list
:list[int]
- The original list of Object IDs to delete
numeric_oids
:list[int]
- The cast-to-int Object IDs
Raises
ValueError
- If numeric_oids is empty
Expand source code
@staticmethod def check_for_empty_oid_list(oid_list, numeric_oids): """Raise an error if the parsed Object ID list is empty Args: oid_list (list[int]): The original list of Object IDs to delete numeric_oids (list[int]): The cast-to-int Object IDs Raises: ValueError: If numeric_oids is empty """ if not numeric_oids: raise ValueError(f'No OBJECTIDs found in {oid_list}')
class FieldChecker (live_data_properties, new_dataframe)
-
Check the fields of a new dataframe against live data. Each method will raise errors if its checks fail. Provides the check_fields class method to run all the checks in one call with having to create an object.
Args
live_data_properties
:dict
- FeatureLayer.properties of live data
new_dataframe
:pd.DataFrame
- New data to be checked
Expand source code
class FieldChecker: """Check the fields of a new dataframe against live data. Each method will raise errors if its checks fail. Provides the check_fields class method to run all the checks in one call with having to create an object. """ @classmethod def check_fields(cls, live_data_properties, new_dataframe, fields, add_oid): """Run all the field checks, raising errors and warnings where they fail. Check individual method docstrings for details and specific errors raised. Args: live_data_properties (dict): FeatureLayer.properties of live data new_dataframe (pd.DataFrame): New data to be checked fields (List[str]): Fields to check add_oid (bool): Add OBJECTID to fields if its not already present (for operations that are dependent on OBJECTID, such as upsert) """ field_checker = cls(live_data_properties, new_dataframe) field_checker.check_fields_present(fields, add_oid=add_oid) field_checker.check_live_and_new_field_types_match(fields) field_checker.check_for_non_null_fields(fields) field_checker.check_field_length(fields) # field_checker.check_srs_wgs84() field_checker.check_nullable_ints_shapely() def __init__(self, live_data_properties, new_dataframe): """ Args: live_data_properties (dict): FeatureLayer.properties of live data new_dataframe (pd.DataFrame): New data to be checked """ self.live_data_properties = live_data_properties self.fields_dataframe = pd.DataFrame(live_data_properties.fields) self.new_dataframe = new_dataframe def check_live_and_new_field_types_match(self, fields): """Raise an error if the field types of the live and new data don't match. Uses a dictionary mapping Esri field types to pandas dtypes. If 'SHAPE' is included in the fields, it calls _check_geometry_types to verify the spatial types are compatible. Args: fields (List[str]): Fields to be updated Raises: ValueError: If the field types or spatial types are incompatible, the new data has multiple geometry types, or the new data is not a valid spatially-enabled dataframe. NotImplementedError: If the live data has a field that has not yet been mapped to a pandas dtype. """ #: Converting dtypes to str and comparing seems to be the only way to break out into shorts and longs, singles #: and doubles. Otherwise, checking subclass is probably more pythonic. short_ints = ['uint8', 'uint16', 'int8', 'int16'] long_ints = ['int', 'uint32', 'uint64', 'int32', 'int64'] #: Leaving the commented types here for future implementation if necessary esri_to_pandas_types_mapping = { 'esriFieldTypeInteger': ['int'] + short_ints + long_ints, 'esriFieldTypeSmallInteger': short_ints, 'esriFieldTypeDouble': ['float', 'float32', 'float64'], 'esriFieldTypeSingle': ['float32'], 'esriFieldTypeString': ['str', 'object', 'string'], 'esriFieldTypeDate': ['datetime64[ns]'], 'esriFieldTypeGeometry': ['geometry'], 'esriFieldTypeOID': ['int'] + short_ints + long_ints, # 'esriFieldTypeBlob': [], 'esriFieldTypeGlobalID': ['str', 'object', 'string'], # 'esriFieldTypeRaster': [], 'esriFieldTypeGUID': ['str', 'object', 'string'], # 'esriFieldTypeXML': [], } #: geometry checking gets its own function if 'SHAPE' in fields: self._check_geometry_types() fields.remove('SHAPE') fields_to_check = self.fields_dataframe[self.fields_dataframe['name'].isin(fields)].set_index('name') invalid_fields = [] int_fields_as_floats = [] datetime_fields_with_timezone = [] for field in fields: #: check against the str.lower to catch normal dtypes (int64) and the new, pd.NA-aware dtypes (Int64) new_dtype = str(self.new_dataframe[field].dtype).lower() live_type = fields_to_check.loc[field, 'type'] try: if new_dtype not in esri_to_pandas_types_mapping[live_type]: invalid_fields.append((field, live_type, str(self.new_dataframe[field].dtype))) if new_dtype in ['float', 'float32', 'float64' ] and live_type in ['esriFieldTypeInteger', 'esriFieldTypeSmallInteger']: int_fields_as_floats.append(field) if 'datetime64' in new_dtype and new_dtype != 'datetime64[ns]' and live_type == 'esriFieldTypeDate': datetime_fields_with_timezone.append(field) except KeyError: # pylint: disable-next=raise-missing-from raise NotImplementedError(f'Live field "{field}" type "{live_type}" not yet mapped to a pandas dtype') if invalid_fields: if int_fields_as_floats: raise IntFieldAsFloatError( f'Field type incompatibilities (field, live type, new type): {invalid_fields}\n' \ 'Check the following int fields for null/np.nan values and convert to panda\'s nullable int '\ f'dtype: {", ".join(int_fields_as_floats)}' ) if datetime_fields_with_timezone: raise TimezoneAwareDatetimeError( f'Field type incompatibilities (field, live type, new type): {invalid_fields}\n' \ 'Check the following datetime fields for timezone aware dtypes values and convert to '\ 'timezone-naive dtypes using pd.to_datetime(df[\'field\']).dt.tz_localize(None): '\ f'{", ".join(datetime_fields_with_timezone)}' ) raise ValueError(f'Field type incompatibilities (field, live type, new type): {invalid_fields}') def _check_geometry_types(self): """Raise an error if the live and new data geometry types are incompatible. Args: live_data_properties (dict): FeatureLayer.properties of live data new_dataframe (pd.DataFrame): New data to be added/updated Raises: ValueError: If the new data is not a valid spatially-enabled dataframe, has multiple geometry types, or has a geometry type that doesn't match the live data. """ esri_to_sedf_geometry_mapping = { 'esriGeometryPoint': 'point', 'esriGeometryMultipoint': 'multipoint', 'esriGeometryPolyline': 'polyline', 'esriGeometryPolygon': 'polygon', 'esriGeometryEnvelope': 'envelope', } if 'SHAPE' not in self.new_dataframe.columns: raise ValueError('New dataframe does not have a SHAPE column') if self.new_dataframe['SHAPE'].isna().any(): raise ValueError( f'New dataframe has missing geometries at index {list(self.new_dataframe[self.new_dataframe["SHAPE"].isna()].index)}' ) live_geometry_type = self.live_data_properties.geometryType new_geometry_types = self.new_dataframe.spatial.geometry_type if len(new_geometry_types) > 1: raise ValueError('New dataframe has multiple geometry types') if esri_to_sedf_geometry_mapping[live_geometry_type] != new_geometry_types[0].lower(): raise ValueError( f'New dataframe geometry type "{new_geometry_types[0]}" incompatible with live geometry type "{live_geometry_type}"' ) def check_for_non_null_fields(self, fields): """Raise an error if the new data contains nulls in a field that the live data says is not nullable. If this error occurs, the client should use pandas fillna() method to replace NaNs/Nones with empty strings or appropriate nodata values. Args: fields (List[str]): Fields to check Raises: ValueError: If the new data contains nulls in a field that the live data says is not nullable and doesn't have a default value. """ columns_with_nulls = self.new_dataframe.columns[self.new_dataframe.isna().any()].tolist() # fields_dataframe = pd.DataFrame(self.live_data_properties['fields']) non_nullable_live_columns = self.fields_dataframe[ ~(self.fields_dataframe['nullable']) & ~(self.fields_dataframe['defaultValue'].astype(bool))]['name'].tolist() columns_to_check = [column for column in columns_with_nulls if column in fields] #: If none of the columns have nulls, we don't need to check further if not columns_to_check: return problem_fields = [] for column in columns_to_check: if column in non_nullable_live_columns: problem_fields.append(column) if problem_fields: raise ValueError( f'The following fields cannot have null values in the live data but one or more nulls exist in the new data: {", ".join(problem_fields)}' ) def check_field_length(self, fields): """Raise an error if a new data string value is longer than allowed in the live data. Args: fields (List[str]): Fields to check Raises: ValueError: If the string fields in the new data contain a value longer than the corresponding field in the live data allows. """ if 'length' not in self.fields_dataframe.columns: module_logger.debug('No fields with length property') return length_limited_fields = self.fields_dataframe[ (self.fields_dataframe['type'].isin(['esriFieldTypeString', 'esriFieldTypeGlobalID'])) & (self.fields_dataframe['length'].astype(bool))] columns_to_check = length_limited_fields[length_limited_fields['name'].isin(fields)] for field, live_max_length in columns_to_check[['name', 'length']].to_records(index=False): new_data_lengths = self.new_dataframe[field].str.len() new_max_length = new_data_lengths.max() if new_max_length > live_max_length: raise ValueError( f'Row {new_data_lengths.argmax()}, column {field} in new data exceeds the live data max length of {live_max_length}' ) def check_fields_present(self, fields, add_oid): """Raise an error if the fields to be operated on aren't present in either the live or new data. Args: fields (List[str]): The fields to be operated on. add_oid (bool): Add OBJECTID to fields if its not already present (for operations that are dependent on OBJECTID, such as upsert) Raises: RuntimeError: If any of fields are not in live or new data. """ live_fields = set(self.fields_dataframe['name']) new_fields = set(self.new_dataframe.columns) working_fields = set(fields) working_fields.discard('SHAPE') #: The fields from the feature layer properties don't include the SHAPE field. if add_oid: working_fields.add('OBJECTID') live_dif = working_fields - live_fields new_dif = working_fields - new_fields error_message = [] if live_dif: error_message.append(f'Fields missing in live data: {", ".join(live_dif)}') if new_dif: error_message.append(f'Fields missing in new data: {", ".join(new_dif)}') if error_message: raise RuntimeError('. '.join(error_message)) def check_srs_wgs84(self): """Raise an error if the new spatial reference system isn't WGS84 as required by geojson. Raises: ValueError: If the new SRS value can't be cast to an int (please log an issue if this occurs) ValueError: If the new SRS value isn't 4326. """ #: If we modify a spatial data frame, sometimes the .sr.wkid property/dictionary becomes {0:number} instead #: of {'wkid': number} try: new_srs = self.new_dataframe.spatial.sr.wkid except AttributeError: new_srs = self.new_dataframe.spatial.sr[0] try: new_srs = int(new_srs) except ValueError as error: raise ValueError('Could not cast new SRS to int') from error if new_srs != 4326: raise ValueError( f'New dataframe SRS {new_srs} is not wkid 4326. Reproject with appropriate transformation.' ) def check_nullable_ints_shapely(self): """Raise a warning if null values occur within nullable integer fields of the dataframe Apparently due to a convention within shapely, any null values in an integer field are converted to 0. Raises: UserWarning: If we're using shapely instead of arcpy, the new dataframe uses nullable int dtypes, and there is one or more pd.NA values within a nullable int column. """ #: Only occurs if client is using shapely instead of arcpy if importlib.util.find_spec('arcpy'): return nullable_ints = {'Int8', 'Int16', 'Int32', 'Int64', 'UInt8', 'UInt16', 'UInt32', 'UInt64'} nullable_int_columns = [ column for column in self.new_dataframe.columns if str(self.new_dataframe[column].dtype) in nullable_ints ] columns_with_nulls = [column for column in nullable_int_columns if self.new_dataframe[column].isnull().any()] if columns_with_nulls: warnings.warn( 'The following columns have null values that will be replaced by 0 due to shapely conventions: '\ f'{", ".join(columns_with_nulls)}' )
Static methods
def check_fields(live_data_properties, new_dataframe, fields, add_oid)
-
Run all the field checks, raising errors and warnings where they fail.
Check individual method docstrings for details and specific errors raised.
Args
live_data_properties
:dict
- FeatureLayer.properties of live data
new_dataframe
:pd.DataFrame
- New data to be checked
fields
:List[str]
- Fields to check
add_oid
:bool
- Add OBJECTID to fields if its not already present (for operations that are dependent on OBJECTID, such as upsert)
Expand source code
@classmethod def check_fields(cls, live_data_properties, new_dataframe, fields, add_oid): """Run all the field checks, raising errors and warnings where they fail. Check individual method docstrings for details and specific errors raised. Args: live_data_properties (dict): FeatureLayer.properties of live data new_dataframe (pd.DataFrame): New data to be checked fields (List[str]): Fields to check add_oid (bool): Add OBJECTID to fields if its not already present (for operations that are dependent on OBJECTID, such as upsert) """ field_checker = cls(live_data_properties, new_dataframe) field_checker.check_fields_present(fields, add_oid=add_oid) field_checker.check_live_and_new_field_types_match(fields) field_checker.check_for_non_null_fields(fields) field_checker.check_field_length(fields) # field_checker.check_srs_wgs84() field_checker.check_nullable_ints_shapely()
Methods
def check_field_length(self, fields)
-
Raise an error if a new data string value is longer than allowed in the live data.
Args
fields
:List[str]
- Fields to check
Raises
ValueError
- If the string fields in the new data contain a value longer than the corresponding field in the live data allows.
Expand source code
def check_field_length(self, fields): """Raise an error if a new data string value is longer than allowed in the live data. Args: fields (List[str]): Fields to check Raises: ValueError: If the string fields in the new data contain a value longer than the corresponding field in the live data allows. """ if 'length' not in self.fields_dataframe.columns: module_logger.debug('No fields with length property') return length_limited_fields = self.fields_dataframe[ (self.fields_dataframe['type'].isin(['esriFieldTypeString', 'esriFieldTypeGlobalID'])) & (self.fields_dataframe['length'].astype(bool))] columns_to_check = length_limited_fields[length_limited_fields['name'].isin(fields)] for field, live_max_length in columns_to_check[['name', 'length']].to_records(index=False): new_data_lengths = self.new_dataframe[field].str.len() new_max_length = new_data_lengths.max() if new_max_length > live_max_length: raise ValueError( f'Row {new_data_lengths.argmax()}, column {field} in new data exceeds the live data max length of {live_max_length}' )
def check_fields_present(self, fields, add_oid)
-
Raise an error if the fields to be operated on aren't present in either the live or new data.
Args
fields
:List[str]
- The fields to be operated on.
add_oid
:bool
- Add OBJECTID to fields if its not already present (for operations that are dependent on OBJECTID, such as upsert)
Raises
RuntimeError
- If any of fields are not in live or new data.
Expand source code
def check_fields_present(self, fields, add_oid): """Raise an error if the fields to be operated on aren't present in either the live or new data. Args: fields (List[str]): The fields to be operated on. add_oid (bool): Add OBJECTID to fields if its not already present (for operations that are dependent on OBJECTID, such as upsert) Raises: RuntimeError: If any of fields are not in live or new data. """ live_fields = set(self.fields_dataframe['name']) new_fields = set(self.new_dataframe.columns) working_fields = set(fields) working_fields.discard('SHAPE') #: The fields from the feature layer properties don't include the SHAPE field. if add_oid: working_fields.add('OBJECTID') live_dif = working_fields - live_fields new_dif = working_fields - new_fields error_message = [] if live_dif: error_message.append(f'Fields missing in live data: {", ".join(live_dif)}') if new_dif: error_message.append(f'Fields missing in new data: {", ".join(new_dif)}') if error_message: raise RuntimeError('. '.join(error_message))
def check_for_non_null_fields(self, fields)
-
Raise an error if the new data contains nulls in a field that the live data says is not nullable.
If this error occurs, the client should use pandas fillna() method to replace NaNs/Nones with empty strings or appropriate nodata values.
Args
fields
:List[str]
- Fields to check
Raises
ValueError
- If the new data contains nulls in a field that the live data says is not nullable and doesn't have a default value.
Expand source code
def check_for_non_null_fields(self, fields): """Raise an error if the new data contains nulls in a field that the live data says is not nullable. If this error occurs, the client should use pandas fillna() method to replace NaNs/Nones with empty strings or appropriate nodata values. Args: fields (List[str]): Fields to check Raises: ValueError: If the new data contains nulls in a field that the live data says is not nullable and doesn't have a default value. """ columns_with_nulls = self.new_dataframe.columns[self.new_dataframe.isna().any()].tolist() # fields_dataframe = pd.DataFrame(self.live_data_properties['fields']) non_nullable_live_columns = self.fields_dataframe[ ~(self.fields_dataframe['nullable']) & ~(self.fields_dataframe['defaultValue'].astype(bool))]['name'].tolist() columns_to_check = [column for column in columns_with_nulls if column in fields] #: If none of the columns have nulls, we don't need to check further if not columns_to_check: return problem_fields = [] for column in columns_to_check: if column in non_nullable_live_columns: problem_fields.append(column) if problem_fields: raise ValueError( f'The following fields cannot have null values in the live data but one or more nulls exist in the new data: {", ".join(problem_fields)}' )
def check_live_and_new_field_types_match(self, fields)
-
Raise an error if the field types of the live and new data don't match.
Uses a dictionary mapping Esri field types to pandas dtypes. If 'SHAPE' is included in the fields, it calls _check_geometry_types to verify the spatial types are compatible.
Args
fields
:List[str]
- Fields to be updated
Raises
ValueError
- If the field types or spatial types are incompatible, the new data has multiple geometry types, or the new data is not a valid spatially-enabled dataframe.
NotImplementedError
- If the live data has a field that has not yet been mapped to a pandas dtype.
Expand source code
def check_live_and_new_field_types_match(self, fields): """Raise an error if the field types of the live and new data don't match. Uses a dictionary mapping Esri field types to pandas dtypes. If 'SHAPE' is included in the fields, it calls _check_geometry_types to verify the spatial types are compatible. Args: fields (List[str]): Fields to be updated Raises: ValueError: If the field types or spatial types are incompatible, the new data has multiple geometry types, or the new data is not a valid spatially-enabled dataframe. NotImplementedError: If the live data has a field that has not yet been mapped to a pandas dtype. """ #: Converting dtypes to str and comparing seems to be the only way to break out into shorts and longs, singles #: and doubles. Otherwise, checking subclass is probably more pythonic. short_ints = ['uint8', 'uint16', 'int8', 'int16'] long_ints = ['int', 'uint32', 'uint64', 'int32', 'int64'] #: Leaving the commented types here for future implementation if necessary esri_to_pandas_types_mapping = { 'esriFieldTypeInteger': ['int'] + short_ints + long_ints, 'esriFieldTypeSmallInteger': short_ints, 'esriFieldTypeDouble': ['float', 'float32', 'float64'], 'esriFieldTypeSingle': ['float32'], 'esriFieldTypeString': ['str', 'object', 'string'], 'esriFieldTypeDate': ['datetime64[ns]'], 'esriFieldTypeGeometry': ['geometry'], 'esriFieldTypeOID': ['int'] + short_ints + long_ints, # 'esriFieldTypeBlob': [], 'esriFieldTypeGlobalID': ['str', 'object', 'string'], # 'esriFieldTypeRaster': [], 'esriFieldTypeGUID': ['str', 'object', 'string'], # 'esriFieldTypeXML': [], } #: geometry checking gets its own function if 'SHAPE' in fields: self._check_geometry_types() fields.remove('SHAPE') fields_to_check = self.fields_dataframe[self.fields_dataframe['name'].isin(fields)].set_index('name') invalid_fields = [] int_fields_as_floats = [] datetime_fields_with_timezone = [] for field in fields: #: check against the str.lower to catch normal dtypes (int64) and the new, pd.NA-aware dtypes (Int64) new_dtype = str(self.new_dataframe[field].dtype).lower() live_type = fields_to_check.loc[field, 'type'] try: if new_dtype not in esri_to_pandas_types_mapping[live_type]: invalid_fields.append((field, live_type, str(self.new_dataframe[field].dtype))) if new_dtype in ['float', 'float32', 'float64' ] and live_type in ['esriFieldTypeInteger', 'esriFieldTypeSmallInteger']: int_fields_as_floats.append(field) if 'datetime64' in new_dtype and new_dtype != 'datetime64[ns]' and live_type == 'esriFieldTypeDate': datetime_fields_with_timezone.append(field) except KeyError: # pylint: disable-next=raise-missing-from raise NotImplementedError(f'Live field "{field}" type "{live_type}" not yet mapped to a pandas dtype') if invalid_fields: if int_fields_as_floats: raise IntFieldAsFloatError( f'Field type incompatibilities (field, live type, new type): {invalid_fields}\n' \ 'Check the following int fields for null/np.nan values and convert to panda\'s nullable int '\ f'dtype: {", ".join(int_fields_as_floats)}' ) if datetime_fields_with_timezone: raise TimezoneAwareDatetimeError( f'Field type incompatibilities (field, live type, new type): {invalid_fields}\n' \ 'Check the following datetime fields for timezone aware dtypes values and convert to '\ 'timezone-naive dtypes using pd.to_datetime(df[\'field\']).dt.tz_localize(None): '\ f'{", ".join(datetime_fields_with_timezone)}' ) raise ValueError(f'Field type incompatibilities (field, live type, new type): {invalid_fields}')
def check_nullable_ints_shapely(self)
-
Raise a warning if null values occur within nullable integer fields of the dataframe
Apparently due to a convention within shapely, any null values in an integer field are converted to 0.
Raises
UserWarning
- If we're using shapely instead of arcpy, the new dataframe uses nullable int dtypes, and there is one or more pd.NA values within a nullable int column.
Expand source code
def check_nullable_ints_shapely(self): """Raise a warning if null values occur within nullable integer fields of the dataframe Apparently due to a convention within shapely, any null values in an integer field are converted to 0. Raises: UserWarning: If we're using shapely instead of arcpy, the new dataframe uses nullable int dtypes, and there is one or more pd.NA values within a nullable int column. """ #: Only occurs if client is using shapely instead of arcpy if importlib.util.find_spec('arcpy'): return nullable_ints = {'Int8', 'Int16', 'Int32', 'Int64', 'UInt8', 'UInt16', 'UInt32', 'UInt64'} nullable_int_columns = [ column for column in self.new_dataframe.columns if str(self.new_dataframe[column].dtype) in nullable_ints ] columns_with_nulls = [column for column in nullable_int_columns if self.new_dataframe[column].isnull().any()] if columns_with_nulls: warnings.warn( 'The following columns have null values that will be replaced by 0 due to shapely conventions: '\ f'{", ".join(columns_with_nulls)}' )
def check_srs_wgs84(self)
-
Raise an error if the new spatial reference system isn't WGS84 as required by geojson.
Raises
ValueError
- If the new SRS value can't be cast to an int (please log an issue if this occurs)
ValueError
- If the new SRS value isn't 4326.
Expand source code
def check_srs_wgs84(self): """Raise an error if the new spatial reference system isn't WGS84 as required by geojson. Raises: ValueError: If the new SRS value can't be cast to an int (please log an issue if this occurs) ValueError: If the new SRS value isn't 4326. """ #: If we modify a spatial data frame, sometimes the .sr.wkid property/dictionary becomes {0:number} instead #: of {'wkid': number} try: new_srs = self.new_dataframe.spatial.sr.wkid except AttributeError: new_srs = self.new_dataframe.spatial.sr[0] try: new_srs = int(new_srs) except ValueError as error: raise ValueError('Could not cast new SRS to int') from error if new_srs != 4326: raise ValueError( f'New dataframe SRS {new_srs} is not wkid 4326. Reproject with appropriate transformation.' )
class Geocoding
-
Methods for geocoding an address
Expand source code
class Geocoding: """Methods for geocoding an address """ @staticmethod def geocode_addr(street, zone, api_key, rate_limits, **api_args): """Geocode an address through the UGRC Web API geocoder Invalid results are returned with an x,y of 0,0, a score of 0.0, and a match address of 'No Match' Args: street (str): The street address zone (str): The zip code or city api_key (str): API key obtained from developer.mapserv.utah.gov rate_limits (Tuple <float>): A lower and upper bound in seconds for pausing between API calls. Defaults to (0.015, 0.03) **api_args (dict): Keyword arguments to be passed as parameters in the API GET call. The API key will be added to this dict. Returns: tuple[int]: The match's x coordinate, y coordinate, score, and match address """ sleep(random.uniform(rate_limits[0], rate_limits[1])) url = f'https://api.mapserv.utah.gov/api/v1/geocode/{street}/{zone}' api_args['apiKey'] = api_key try: geocode_result_dict = retry(Geocoding._geocode_api_call, url, api_args) except Exception as error: module_logger.error(error) return (0, 0, 0., 'No API response') return ( geocode_result_dict['location']['x'], geocode_result_dict['location']['y'], geocode_result_dict['score'], geocode_result_dict['matchAddress'], ) @staticmethod def _geocode_api_call(url, api_args): """Makes a requests.get call to the geocoding API. Meant to be called through a retry wrapper so that the RuntimeErrors get tried again a couple times before finally raising the error. Args: url (str): Base url for GET request api_args (dict): Dictionary of URL parameters Raises: RuntimeError: If the server does not return response and request.get returns a falsy object. RuntimeError: If the server returns a status code other than 200 or 404 Returns: dict: The 'results' dictionary of the response json (location, score, and matchAddress) """ response = requests.get(url, params=api_args) #: The server times out and doesn't respond if response is None: module_logger.debug('GET call did not return a response') raise RuntimeError('No response from GET; request timeout?') #: The point does geocode if response.status_code == 200: return response.json()['result'] #: The point doesn't geocode if response.status_code == 404: return { 'location': { 'x': 0, 'y': 0 }, 'score': 0., 'matchAddress': 'No Match', } #: If we haven't returned, raise an error to trigger _retry raise RuntimeError(f'Did not receive a valid geocoding response; status code: {response.status_code}') @staticmethod def validate_api_key(api_key): """Check to see if a Web API key is valid by geocoding a single, known address point Args: api_key (str): API Key Raises: RuntimeError: If there was a network or other error attempting to geocode the known point ValueError: If the API responds with an invalid key message UserWarning: If the API responds with some other abnormal result """ url = 'https://api.mapserv.utah.gov/api/v1/geocode/326 east south temple street/slc' try: response = retry(requests.get, url=url, params={'apiKey': api_key}) except Exception as error: raise RuntimeError( 'Could not determine key validity; check your API key and/or network connection' ) from error response_json = response.json() if response_json['status'] == 200: return if response_json['status'] == 400 and 'Invalid API key' in response_json['message']: raise ValueError(f'API key validation failed: {response_json["message"]}') warnings.warn(f'Unhandled API key validation response {response_json["status"]}: {response_json["message"]}')
Static methods
def geocode_addr(street, zone, api_key, rate_limits, **api_args)
-
Geocode an address through the UGRC Web API geocoder
Invalid results are returned with an x,y of 0,0, a score of 0.0, and a match address of 'No Match'
Args
street
:str
- The street address
zone
:str
- The zip code or city
api_key
:str
- API key obtained from developer.mapserv.utah.gov
- rate_limits (Tuple
): A lower and upper bound in seconds for pausing between API calls. Defaults to - (0.015, 0.03)
**api_args
:dict
- Keyword arguments to be passed as parameters in the API GET call. The API key will be added to this dict.
Returns
tuple[int]
- The match's x coordinate, y coordinate, score, and match address
Expand source code
@staticmethod def geocode_addr(street, zone, api_key, rate_limits, **api_args): """Geocode an address through the UGRC Web API geocoder Invalid results are returned with an x,y of 0,0, a score of 0.0, and a match address of 'No Match' Args: street (str): The street address zone (str): The zip code or city api_key (str): API key obtained from developer.mapserv.utah.gov rate_limits (Tuple <float>): A lower and upper bound in seconds for pausing between API calls. Defaults to (0.015, 0.03) **api_args (dict): Keyword arguments to be passed as parameters in the API GET call. The API key will be added to this dict. Returns: tuple[int]: The match's x coordinate, y coordinate, score, and match address """ sleep(random.uniform(rate_limits[0], rate_limits[1])) url = f'https://api.mapserv.utah.gov/api/v1/geocode/{street}/{zone}' api_args['apiKey'] = api_key try: geocode_result_dict = retry(Geocoding._geocode_api_call, url, api_args) except Exception as error: module_logger.error(error) return (0, 0, 0., 'No API response') return ( geocode_result_dict['location']['x'], geocode_result_dict['location']['y'], geocode_result_dict['score'], geocode_result_dict['matchAddress'], )
def validate_api_key(api_key)
-
Check to see if a Web API key is valid by geocoding a single, known address point
Args
api_key
:str
- API Key
Raises
RuntimeError
- If there was a network or other error attempting to geocode the known point
ValueError
- If the API responds with an invalid key message
UserWarning
- If the API responds with some other abnormal result
Expand source code
@staticmethod def validate_api_key(api_key): """Check to see if a Web API key is valid by geocoding a single, known address point Args: api_key (str): API Key Raises: RuntimeError: If there was a network or other error attempting to geocode the known point ValueError: If the API responds with an invalid key message UserWarning: If the API responds with some other abnormal result """ url = 'https://api.mapserv.utah.gov/api/v1/geocode/326 east south temple street/slc' try: response = retry(requests.get, url=url, params={'apiKey': api_key}) except Exception as error: raise RuntimeError( 'Could not determine key validity; check your API key and/or network connection' ) from error response_json = response.json() if response_json['status'] == 200: return if response_json['status'] == 400 and 'Invalid API key' in response_json['message']: raise ValueError(f'API key validation failed: {response_json["message"]}') warnings.warn(f'Unhandled API key validation response {response_json["status"]}: {response_json["message"]}')