Module palletjack.transform

Transform pandas dataframes in preparation for loading to AGOL.

Expand source code
"""Transform pandas dataframes in preparation for loading to AGOL.
"""
import locale
import logging
import warnings
from datetime import datetime

import arcgis
import pandas as pd
from arcgis import GeoAccessor, GeoSeriesAccessor

from palletjack import utils

module_logger = logging.getLogger(__name__)


class APIGeocoder:
    """Geocode a dataframe using the UGRC Web API Geocoder.

    Instantiate an APIGeocoder object with an api key from developer.mapserv.utah.gov. It will attempt to validate the
    API key. If validation fails, it will raise one of the following errors:

    - RuntimeError: If there was a network or other error
    - ValueError: If the key is invalid
    - UserWarning: If the API responds with some other abnormal result

    The individual geocoding steps are exposed in the `palletjack.utils.Geocoding` class in the utils module for use in
    other settings.
    """

    def __init__(self, api_key):
        """
        Args:
            api_key (str): API key obtained from developer.mapserv.utah.gov
        """
        self.api_key = api_key
        self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__)
        utils.Geocoding.validate_api_key(self.api_key)

    def geocode_dataframe(self, dataframe, street_col, zone_col, wkid, rate_limits=(0.015, 0.03), **api_args):
        """Geocode a pandas dataframe into a spatially-enabled dataframe

        Addresses that don't meet the threshold for geocoding (score > 70) are returned as points at 0,0

        Args:
            dataframe (pd.DataFrame): Input data with separate columns for street address and zip or city
            street_col (str): The column containing the street address
            zone_col (str): The column containing either the zip code or the city name
            wkid (int): The projection to return the x/y points in
            rate_limits(Tuple <float>): A lower and upper bound in seconds for pausing between API calls. Defaults to
                (0.015, 0.03)
            **api_args (dict): Keyword arguments to be passed as parameters in the API GET call.

        Returns:
            pd.DataFrame.spatial: Geocoded data as a spatially-enabled DataFrame
        """

        start = datetime.now()

        #: Should this return? Should it raise an error instead?
        if dataframe.empty:
            warnings.warn('No records to geocode (empty dataframe)', RuntimeWarning)

        dataframe_length = len(dataframe.index)
        reporting_interval = utils.calc_modulus_for_reporting_interval(dataframe_length)
        self._class_logger.info('Geocoding %s rows...', dataframe_length)

        street_col_index = dataframe.columns.get_loc(street_col)
        zone_col_index = dataframe.columns.get_loc(zone_col)
        new_rows = []
        for i, row in enumerate(dataframe.itertuples(index=False)):
            if i % reporting_interval == 0:
                self._class_logger.info('Geocoding row %s of %s, %s%%', i, dataframe_length, i / dataframe_length * 100)
            row_dict = row._asdict()
            results = utils.Geocoding.geocode_addr(
                row[street_col_index],
                row[zone_col_index],
                self.api_key,
                rate_limits,
                spatialReference=str(wkid),
                **api_args
            )
            self._class_logger.debug(
                '%s of %s: %s, %s = %s', i, dataframe_length, row[street_col_index], row[zone_col_index], results
            )
            row_dict['x'], row_dict['y'], row_dict['score'], row_dict['matchAddress'] = results
            new_rows.append(row_dict)

        spatial_dataframe = pd.DataFrame.spatial.from_xy(pd.DataFrame(new_rows), 'x', 'y', sr=int(wkid))

        end = datetime.now()
        self._class_logger.info('%s Records geocoded in %s', len(spatial_dataframe.index), (end - start))
        try:
            self._class_logger.debug('Average time per record: %s', (end - start) / len(spatial_dataframe.index))
        except ZeroDivisionError:
            warnings.warn('Empty spatial dataframe after geocoding', RuntimeWarning)
        return spatial_dataframe


class FeatureServiceMerging:
    """Get the live dataframe from a feature service and update it from another dataframe
    """

    @staticmethod
    def update_live_data_with_new_data(live_dataframe, new_dataframe, join_column):
        """Update a dataframe with data from another

        Args:
            live_dataframe (pd.DataFrame): The dataframe containing info to be updated
            new_dataframe (pd.DataFrame): Dataframe containing source info to use in the update
            join_column (str): The column with unique IDs to be used as a key between the two dataframes

        Raises:
            ValueError: If the join_column is missing from either live or new data
            RuntimeWarning: If there are rows in the new data that are not found in the live data; these will not be
                added to the live dataframe.

        Returns:
            pd.DataFrame: The updated dataframe, with data types converted via .convert_dtypes()
        """

        try:
            live_dataframe.set_index(join_column, inplace=True)
            new_dataframe.set_index(join_column, inplace=True)
        except KeyError as error:
            raise ValueError('Join column not found in live or new dataframes') from error

        indicator_dataframe = live_dataframe.merge(new_dataframe, on=join_column, how='outer', indicator=True)
        new_only_dataframe = indicator_dataframe[indicator_dataframe['_merge'] == 'right_only']
        if not new_only_dataframe.empty:
            keys_not_found = list(new_only_dataframe.index)
            warnings.warn(
                f'The following keys from the new data were not found in the existing dataset: {keys_not_found}',
                RuntimeWarning
            )

        live_dataframe.update(new_dataframe)
        return (live_dataframe.reset_index().convert_dtypes())

    @staticmethod
    def get_live_dataframe(gis, feature_service_itemid, layer_index=0):
        """Get a spatially-enabled dataframe representation of a hosted feature layer

        Args:
            gis (arcgis.gis.GIS): GIS object of the desired organization
            feature_service_itemid (str): itemid in the gis of the desired hosted feature service
            layer_index (int, optional): Index of the desired layer within the hosted feature service. Defaults to 0.

        Raises:
            RuntimeError: If it fails to load the data

        Returns:
            pd.DataFrame.spatial: Spatially-enabled dataframe representation of the hosted feature layer
        """

        try:
            feature_layer = arcgis.features.FeatureLayer.fromitem(
                gis.content.get(feature_service_itemid), layer_id=layer_index
            )
            live_dataframe = feature_layer.query(as_df=True)
        except Exception as error:
            raise RuntimeError('Failed to load live dataframe') from error

        return live_dataframe


class DataCleaning:
    """Static methods for cleaning dataframes prior to uploading to AGOL
    """

    @staticmethod
    def switch_to_nullable_int(dataframe, fields_that_should_be_ints):
        """Convert specified fields to panda's nullable Int64 type to preserve int to EsriFieldTypeInteger mapping

        Args:
            dataframe (pd.DataFrame): Input dataframe with columns to be converted
            fields_that_should_be_ints (list[str]): List of column names to be converted

        Raises:
            TypeError: If any of the conversions fail. Often caused by values that aren't int-castable floats (ie. x.0)
                or np.nans.

        Returns:
            pd.DataFrame: Input dataframe with columns converted to nullable Int64
        """

        retyped = dataframe.copy()
        try:
            for field in fields_that_should_be_ints:
                retyped[field] = DataCleaning._switch_series_to_numeric_dtype(retyped[field], 'Int64')
        except (TypeError, ValueError) as error:
            raise TypeError(
                'Cannot convert one or more fields to nullable ints. Check for non-int/non-np.nan values.'
            ) from error
        return retyped

    @staticmethod
    def switch_to_float(dataframe, fields_that_should_be_floats):
        """Convert specified fields to float, converting empty strings to None first as required

        Args:
            dataframe (pd.DataFrame): Input dataframe with columns to be converted
            fields_that_should_be_floats (list[str]): List of column names to be converted

        Raises:
            TypeError: If any of the conversions fail. Often caused by values that aren't castable to floats
                (non-empty, non-numeric strings, etc)

        Returns:
            pd.DataFrame: Input dataframe with columns converted to float
        """

        retyped = dataframe.copy()
        try:
            for field in fields_that_should_be_floats:
                retyped[field] = DataCleaning._switch_series_to_numeric_dtype(retyped[field], 'float')
        except (TypeError, ValueError) as error:
            raise TypeError(
                'Cannot convert one or more fields to floats. Check for non-float/non-null values.'
            ) from error
        return retyped

    @staticmethod
    def _switch_series_to_numeric_dtype(series, dtype):
        """Switch the dtype of a series to the specified dtype

        Series of dtype 'object' (ie, series of strings or mixed strings and numbers) are converted to str so that they
        can be de-localized to remove comma thousands separators

        Args:
            series (pd.Series): The series to be converted
            dtype (str): The dtype to convert to

        Returns:
            pd.Series: The converted series
        """

        if series.dtype == 'object':
            series = series.astype(str).apply(locale.delocalize)
            series.replace('', None, inplace=True)
        return series.astype(dtype)

    @staticmethod
    def switch_to_datetime(dataframe, date_fields, **to_datetime_kwargs):
        """Convert specified fields to datetime dtypes to ensure proper date formatting for AGOL

        Args:
            dataframe (pd.DataFrame): The source dataframe
            date_fields (List[int]): The fields to convert to datetime
            **to_datetime_kwargs (keyword arguments, optional): Arguments to pass through to pd.to_datetime

        Returns:
            pd.DataFrame: The source dataframe with converted fields.
        """

        for field in date_fields:
            dataframe[field] = pd.to_datetime(dataframe[field], **to_datetime_kwargs) \
                                    .dt.as_unit('ns') \
                                    .dt.tz_localize(None)

        return dataframe

    @staticmethod
    def rename_dataframe_columns_for_agol(dataframe):
        """Rename all the columns in a dataframe to valid AGOL column names

        Args:
            dataframe (pd.DataFrame): Dataframe to be renamed

        Returns:
            pd.DataFrame: Input dataframe with renamed columns
        """

        rename_dict = utils.rename_columns_for_agol(dataframe.columns)
        renamed_dataframe = dataframe.rename(columns=rename_dict)
        return renamed_dataframe

Classes

class APIGeocoder (api_key)

Geocode a dataframe using the UGRC Web API Geocoder.

Instantiate an APIGeocoder object with an api key from developer.mapserv.utah.gov. It will attempt to validate the API key. If validation fails, it will raise one of the following errors:

  • RuntimeError: If there was a network or other error
  • ValueError: If the key is invalid
  • UserWarning: If the API responds with some other abnormal result

The individual geocoding steps are exposed in the Geocoding class in the utils module for use in other settings.

Args

api_key : str
API key obtained from developer.mapserv.utah.gov
Expand source code
class APIGeocoder:
    """Geocode a dataframe using the UGRC Web API Geocoder.

    Instantiate an APIGeocoder object with an api key from developer.mapserv.utah.gov. It will attempt to validate the
    API key. If validation fails, it will raise one of the following errors:

    - RuntimeError: If there was a network or other error
    - ValueError: If the key is invalid
    - UserWarning: If the API responds with some other abnormal result

    The individual geocoding steps are exposed in the `palletjack.utils.Geocoding` class in the utils module for use in
    other settings.
    """

    def __init__(self, api_key):
        """
        Args:
            api_key (str): API key obtained from developer.mapserv.utah.gov
        """
        self.api_key = api_key
        self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__)
        utils.Geocoding.validate_api_key(self.api_key)

    def geocode_dataframe(self, dataframe, street_col, zone_col, wkid, rate_limits=(0.015, 0.03), **api_args):
        """Geocode a pandas dataframe into a spatially-enabled dataframe

        Addresses that don't meet the threshold for geocoding (score > 70) are returned as points at 0,0

        Args:
            dataframe (pd.DataFrame): Input data with separate columns for street address and zip or city
            street_col (str): The column containing the street address
            zone_col (str): The column containing either the zip code or the city name
            wkid (int): The projection to return the x/y points in
            rate_limits(Tuple <float>): A lower and upper bound in seconds for pausing between API calls. Defaults to
                (0.015, 0.03)
            **api_args (dict): Keyword arguments to be passed as parameters in the API GET call.

        Returns:
            pd.DataFrame.spatial: Geocoded data as a spatially-enabled DataFrame
        """

        start = datetime.now()

        #: Should this return? Should it raise an error instead?
        if dataframe.empty:
            warnings.warn('No records to geocode (empty dataframe)', RuntimeWarning)

        dataframe_length = len(dataframe.index)
        reporting_interval = utils.calc_modulus_for_reporting_interval(dataframe_length)
        self._class_logger.info('Geocoding %s rows...', dataframe_length)

        street_col_index = dataframe.columns.get_loc(street_col)
        zone_col_index = dataframe.columns.get_loc(zone_col)
        new_rows = []
        for i, row in enumerate(dataframe.itertuples(index=False)):
            if i % reporting_interval == 0:
                self._class_logger.info('Geocoding row %s of %s, %s%%', i, dataframe_length, i / dataframe_length * 100)
            row_dict = row._asdict()
            results = utils.Geocoding.geocode_addr(
                row[street_col_index],
                row[zone_col_index],
                self.api_key,
                rate_limits,
                spatialReference=str(wkid),
                **api_args
            )
            self._class_logger.debug(
                '%s of %s: %s, %s = %s', i, dataframe_length, row[street_col_index], row[zone_col_index], results
            )
            row_dict['x'], row_dict['y'], row_dict['score'], row_dict['matchAddress'] = results
            new_rows.append(row_dict)

        spatial_dataframe = pd.DataFrame.spatial.from_xy(pd.DataFrame(new_rows), 'x', 'y', sr=int(wkid))

        end = datetime.now()
        self._class_logger.info('%s Records geocoded in %s', len(spatial_dataframe.index), (end - start))
        try:
            self._class_logger.debug('Average time per record: %s', (end - start) / len(spatial_dataframe.index))
        except ZeroDivisionError:
            warnings.warn('Empty spatial dataframe after geocoding', RuntimeWarning)
        return spatial_dataframe

Methods

def geocode_dataframe(self, dataframe, street_col, zone_col, wkid, rate_limits=(0.015, 0.03), **api_args)

Geocode a pandas dataframe into a spatially-enabled dataframe

Addresses that don't meet the threshold for geocoding (score > 70) are returned as points at 0,0

Args

dataframe : pd.DataFrame
Input data with separate columns for street address and zip or city
street_col : str
The column containing the street address
zone_col : str
The column containing either the zip code or the city name
wkid : int
The projection to return the x/y points in
rate_limits(Tuple ): A lower and upper bound in seconds for pausing between API calls. Defaults to
(0.015, 0.03)
**api_args : dict
Keyword arguments to be passed as parameters in the API GET call.

Returns

pd.DataFrame.spatial
Geocoded data as a spatially-enabled DataFrame
Expand source code
def geocode_dataframe(self, dataframe, street_col, zone_col, wkid, rate_limits=(0.015, 0.03), **api_args):
    """Geocode a pandas dataframe into a spatially-enabled dataframe

    Addresses that don't meet the threshold for geocoding (score > 70) are returned as points at 0,0

    Args:
        dataframe (pd.DataFrame): Input data with separate columns for street address and zip or city
        street_col (str): The column containing the street address
        zone_col (str): The column containing either the zip code or the city name
        wkid (int): The projection to return the x/y points in
        rate_limits(Tuple <float>): A lower and upper bound in seconds for pausing between API calls. Defaults to
            (0.015, 0.03)
        **api_args (dict): Keyword arguments to be passed as parameters in the API GET call.

    Returns:
        pd.DataFrame.spatial: Geocoded data as a spatially-enabled DataFrame
    """

    start = datetime.now()

    #: Should this return? Should it raise an error instead?
    if dataframe.empty:
        warnings.warn('No records to geocode (empty dataframe)', RuntimeWarning)

    dataframe_length = len(dataframe.index)
    reporting_interval = utils.calc_modulus_for_reporting_interval(dataframe_length)
    self._class_logger.info('Geocoding %s rows...', dataframe_length)

    street_col_index = dataframe.columns.get_loc(street_col)
    zone_col_index = dataframe.columns.get_loc(zone_col)
    new_rows = []
    for i, row in enumerate(dataframe.itertuples(index=False)):
        if i % reporting_interval == 0:
            self._class_logger.info('Geocoding row %s of %s, %s%%', i, dataframe_length, i / dataframe_length * 100)
        row_dict = row._asdict()
        results = utils.Geocoding.geocode_addr(
            row[street_col_index],
            row[zone_col_index],
            self.api_key,
            rate_limits,
            spatialReference=str(wkid),
            **api_args
        )
        self._class_logger.debug(
            '%s of %s: %s, %s = %s', i, dataframe_length, row[street_col_index], row[zone_col_index], results
        )
        row_dict['x'], row_dict['y'], row_dict['score'], row_dict['matchAddress'] = results
        new_rows.append(row_dict)

    spatial_dataframe = pd.DataFrame.spatial.from_xy(pd.DataFrame(new_rows), 'x', 'y', sr=int(wkid))

    end = datetime.now()
    self._class_logger.info('%s Records geocoded in %s', len(spatial_dataframe.index), (end - start))
    try:
        self._class_logger.debug('Average time per record: %s', (end - start) / len(spatial_dataframe.index))
    except ZeroDivisionError:
        warnings.warn('Empty spatial dataframe after geocoding', RuntimeWarning)
    return spatial_dataframe
class DataCleaning

Static methods for cleaning dataframes prior to uploading to AGOL

Expand source code
class DataCleaning:
    """Static methods for cleaning dataframes prior to uploading to AGOL
    """

    @staticmethod
    def switch_to_nullable_int(dataframe, fields_that_should_be_ints):
        """Convert specified fields to panda's nullable Int64 type to preserve int to EsriFieldTypeInteger mapping

        Args:
            dataframe (pd.DataFrame): Input dataframe with columns to be converted
            fields_that_should_be_ints (list[str]): List of column names to be converted

        Raises:
            TypeError: If any of the conversions fail. Often caused by values that aren't int-castable floats (ie. x.0)
                or np.nans.

        Returns:
            pd.DataFrame: Input dataframe with columns converted to nullable Int64
        """

        retyped = dataframe.copy()
        try:
            for field in fields_that_should_be_ints:
                retyped[field] = DataCleaning._switch_series_to_numeric_dtype(retyped[field], 'Int64')
        except (TypeError, ValueError) as error:
            raise TypeError(
                'Cannot convert one or more fields to nullable ints. Check for non-int/non-np.nan values.'
            ) from error
        return retyped

    @staticmethod
    def switch_to_float(dataframe, fields_that_should_be_floats):
        """Convert specified fields to float, converting empty strings to None first as required

        Args:
            dataframe (pd.DataFrame): Input dataframe with columns to be converted
            fields_that_should_be_floats (list[str]): List of column names to be converted

        Raises:
            TypeError: If any of the conversions fail. Often caused by values that aren't castable to floats
                (non-empty, non-numeric strings, etc)

        Returns:
            pd.DataFrame: Input dataframe with columns converted to float
        """

        retyped = dataframe.copy()
        try:
            for field in fields_that_should_be_floats:
                retyped[field] = DataCleaning._switch_series_to_numeric_dtype(retyped[field], 'float')
        except (TypeError, ValueError) as error:
            raise TypeError(
                'Cannot convert one or more fields to floats. Check for non-float/non-null values.'
            ) from error
        return retyped

    @staticmethod
    def _switch_series_to_numeric_dtype(series, dtype):
        """Switch the dtype of a series to the specified dtype

        Series of dtype 'object' (ie, series of strings or mixed strings and numbers) are converted to str so that they
        can be de-localized to remove comma thousands separators

        Args:
            series (pd.Series): The series to be converted
            dtype (str): The dtype to convert to

        Returns:
            pd.Series: The converted series
        """

        if series.dtype == 'object':
            series = series.astype(str).apply(locale.delocalize)
            series.replace('', None, inplace=True)
        return series.astype(dtype)

    @staticmethod
    def switch_to_datetime(dataframe, date_fields, **to_datetime_kwargs):
        """Convert specified fields to datetime dtypes to ensure proper date formatting for AGOL

        Args:
            dataframe (pd.DataFrame): The source dataframe
            date_fields (List[int]): The fields to convert to datetime
            **to_datetime_kwargs (keyword arguments, optional): Arguments to pass through to pd.to_datetime

        Returns:
            pd.DataFrame: The source dataframe with converted fields.
        """

        for field in date_fields:
            dataframe[field] = pd.to_datetime(dataframe[field], **to_datetime_kwargs) \
                                    .dt.as_unit('ns') \
                                    .dt.tz_localize(None)

        return dataframe

    @staticmethod
    def rename_dataframe_columns_for_agol(dataframe):
        """Rename all the columns in a dataframe to valid AGOL column names

        Args:
            dataframe (pd.DataFrame): Dataframe to be renamed

        Returns:
            pd.DataFrame: Input dataframe with renamed columns
        """

        rename_dict = utils.rename_columns_for_agol(dataframe.columns)
        renamed_dataframe = dataframe.rename(columns=rename_dict)
        return renamed_dataframe

Static methods

def rename_dataframe_columns_for_agol(dataframe)

Rename all the columns in a dataframe to valid AGOL column names

Args

dataframe : pd.DataFrame
Dataframe to be renamed

Returns

pd.DataFrame
Input dataframe with renamed columns
Expand source code
@staticmethod
def rename_dataframe_columns_for_agol(dataframe):
    """Rename all the columns in a dataframe to valid AGOL column names

    Args:
        dataframe (pd.DataFrame): Dataframe to be renamed

    Returns:
        pd.DataFrame: Input dataframe with renamed columns
    """

    rename_dict = utils.rename_columns_for_agol(dataframe.columns)
    renamed_dataframe = dataframe.rename(columns=rename_dict)
    return renamed_dataframe
def switch_to_datetime(dataframe, date_fields, **to_datetime_kwargs)

Convert specified fields to datetime dtypes to ensure proper date formatting for AGOL

Args

dataframe : pd.DataFrame
The source dataframe
date_fields : List[int]
The fields to convert to datetime
**to_datetime_kwargs : keyword arguments, optional
Arguments to pass through to pd.to_datetime

Returns

pd.DataFrame
The source dataframe with converted fields.
Expand source code
@staticmethod
def switch_to_datetime(dataframe, date_fields, **to_datetime_kwargs):
    """Convert specified fields to datetime dtypes to ensure proper date formatting for AGOL

    Args:
        dataframe (pd.DataFrame): The source dataframe
        date_fields (List[int]): The fields to convert to datetime
        **to_datetime_kwargs (keyword arguments, optional): Arguments to pass through to pd.to_datetime

    Returns:
        pd.DataFrame: The source dataframe with converted fields.
    """

    for field in date_fields:
        dataframe[field] = pd.to_datetime(dataframe[field], **to_datetime_kwargs) \
                                .dt.as_unit('ns') \
                                .dt.tz_localize(None)

    return dataframe
def switch_to_float(dataframe, fields_that_should_be_floats)

Convert specified fields to float, converting empty strings to None first as required

Args

dataframe : pd.DataFrame
Input dataframe with columns to be converted
fields_that_should_be_floats : list[str]
List of column names to be converted

Raises

TypeError
If any of the conversions fail. Often caused by values that aren't castable to floats (non-empty, non-numeric strings, etc)

Returns

pd.DataFrame
Input dataframe with columns converted to float
Expand source code
@staticmethod
def switch_to_float(dataframe, fields_that_should_be_floats):
    """Convert specified fields to float, converting empty strings to None first as required

    Args:
        dataframe (pd.DataFrame): Input dataframe with columns to be converted
        fields_that_should_be_floats (list[str]): List of column names to be converted

    Raises:
        TypeError: If any of the conversions fail. Often caused by values that aren't castable to floats
            (non-empty, non-numeric strings, etc)

    Returns:
        pd.DataFrame: Input dataframe with columns converted to float
    """

    retyped = dataframe.copy()
    try:
        for field in fields_that_should_be_floats:
            retyped[field] = DataCleaning._switch_series_to_numeric_dtype(retyped[field], 'float')
    except (TypeError, ValueError) as error:
        raise TypeError(
            'Cannot convert one or more fields to floats. Check for non-float/non-null values.'
        ) from error
    return retyped
def switch_to_nullable_int(dataframe, fields_that_should_be_ints)

Convert specified fields to panda's nullable Int64 type to preserve int to EsriFieldTypeInteger mapping

Args

dataframe : pd.DataFrame
Input dataframe with columns to be converted
fields_that_should_be_ints : list[str]
List of column names to be converted

Raises

TypeError
If any of the conversions fail. Often caused by values that aren't int-castable floats (ie. x.0) or np.nans.

Returns

pd.DataFrame
Input dataframe with columns converted to nullable Int64
Expand source code
@staticmethod
def switch_to_nullable_int(dataframe, fields_that_should_be_ints):
    """Convert specified fields to panda's nullable Int64 type to preserve int to EsriFieldTypeInteger mapping

    Args:
        dataframe (pd.DataFrame): Input dataframe with columns to be converted
        fields_that_should_be_ints (list[str]): List of column names to be converted

    Raises:
        TypeError: If any of the conversions fail. Often caused by values that aren't int-castable floats (ie. x.0)
            or np.nans.

    Returns:
        pd.DataFrame: Input dataframe with columns converted to nullable Int64
    """

    retyped = dataframe.copy()
    try:
        for field in fields_that_should_be_ints:
            retyped[field] = DataCleaning._switch_series_to_numeric_dtype(retyped[field], 'Int64')
    except (TypeError, ValueError) as error:
        raise TypeError(
            'Cannot convert one or more fields to nullable ints. Check for non-int/non-np.nan values.'
        ) from error
    return retyped
class FeatureServiceMerging

Get the live dataframe from a feature service and update it from another dataframe

Expand source code
class FeatureServiceMerging:
    """Get the live dataframe from a feature service and update it from another dataframe
    """

    @staticmethod
    def update_live_data_with_new_data(live_dataframe, new_dataframe, join_column):
        """Update a dataframe with data from another

        Args:
            live_dataframe (pd.DataFrame): The dataframe containing info to be updated
            new_dataframe (pd.DataFrame): Dataframe containing source info to use in the update
            join_column (str): The column with unique IDs to be used as a key between the two dataframes

        Raises:
            ValueError: If the join_column is missing from either live or new data
            RuntimeWarning: If there are rows in the new data that are not found in the live data; these will not be
                added to the live dataframe.

        Returns:
            pd.DataFrame: The updated dataframe, with data types converted via .convert_dtypes()
        """

        try:
            live_dataframe.set_index(join_column, inplace=True)
            new_dataframe.set_index(join_column, inplace=True)
        except KeyError as error:
            raise ValueError('Join column not found in live or new dataframes') from error

        indicator_dataframe = live_dataframe.merge(new_dataframe, on=join_column, how='outer', indicator=True)
        new_only_dataframe = indicator_dataframe[indicator_dataframe['_merge'] == 'right_only']
        if not new_only_dataframe.empty:
            keys_not_found = list(new_only_dataframe.index)
            warnings.warn(
                f'The following keys from the new data were not found in the existing dataset: {keys_not_found}',
                RuntimeWarning
            )

        live_dataframe.update(new_dataframe)
        return (live_dataframe.reset_index().convert_dtypes())

    @staticmethod
    def get_live_dataframe(gis, feature_service_itemid, layer_index=0):
        """Get a spatially-enabled dataframe representation of a hosted feature layer

        Args:
            gis (arcgis.gis.GIS): GIS object of the desired organization
            feature_service_itemid (str): itemid in the gis of the desired hosted feature service
            layer_index (int, optional): Index of the desired layer within the hosted feature service. Defaults to 0.

        Raises:
            RuntimeError: If it fails to load the data

        Returns:
            pd.DataFrame.spatial: Spatially-enabled dataframe representation of the hosted feature layer
        """

        try:
            feature_layer = arcgis.features.FeatureLayer.fromitem(
                gis.content.get(feature_service_itemid), layer_id=layer_index
            )
            live_dataframe = feature_layer.query(as_df=True)
        except Exception as error:
            raise RuntimeError('Failed to load live dataframe') from error

        return live_dataframe

Static methods

def get_live_dataframe(gis, feature_service_itemid, layer_index=0)

Get a spatially-enabled dataframe representation of a hosted feature layer

Args

gis : arcgis.gis.GIS
GIS object of the desired organization
feature_service_itemid : str
itemid in the gis of the desired hosted feature service
layer_index : int, optional
Index of the desired layer within the hosted feature service. Defaults to 0.

Raises

RuntimeError
If it fails to load the data

Returns

pd.DataFrame.spatial
Spatially-enabled dataframe representation of the hosted feature layer
Expand source code
@staticmethod
def get_live_dataframe(gis, feature_service_itemid, layer_index=0):
    """Get a spatially-enabled dataframe representation of a hosted feature layer

    Args:
        gis (arcgis.gis.GIS): GIS object of the desired organization
        feature_service_itemid (str): itemid in the gis of the desired hosted feature service
        layer_index (int, optional): Index of the desired layer within the hosted feature service. Defaults to 0.

    Raises:
        RuntimeError: If it fails to load the data

    Returns:
        pd.DataFrame.spatial: Spatially-enabled dataframe representation of the hosted feature layer
    """

    try:
        feature_layer = arcgis.features.FeatureLayer.fromitem(
            gis.content.get(feature_service_itemid), layer_id=layer_index
        )
        live_dataframe = feature_layer.query(as_df=True)
    except Exception as error:
        raise RuntimeError('Failed to load live dataframe') from error

    return live_dataframe
def update_live_data_with_new_data(live_dataframe, new_dataframe, join_column)

Update a dataframe with data from another

Args

live_dataframe : pd.DataFrame
The dataframe containing info to be updated
new_dataframe : pd.DataFrame
Dataframe containing source info to use in the update
join_column : str
The column with unique IDs to be used as a key between the two dataframes

Raises

ValueError
If the join_column is missing from either live or new data
RuntimeWarning
If there are rows in the new data that are not found in the live data; these will not be added to the live dataframe.

Returns

pd.DataFrame
The updated dataframe, with data types converted via .convert_dtypes()
Expand source code
@staticmethod
def update_live_data_with_new_data(live_dataframe, new_dataframe, join_column):
    """Update a dataframe with data from another

    Args:
        live_dataframe (pd.DataFrame): The dataframe containing info to be updated
        new_dataframe (pd.DataFrame): Dataframe containing source info to use in the update
        join_column (str): The column with unique IDs to be used as a key between the two dataframes

    Raises:
        ValueError: If the join_column is missing from either live or new data
        RuntimeWarning: If there are rows in the new data that are not found in the live data; these will not be
            added to the live dataframe.

    Returns:
        pd.DataFrame: The updated dataframe, with data types converted via .convert_dtypes()
    """

    try:
        live_dataframe.set_index(join_column, inplace=True)
        new_dataframe.set_index(join_column, inplace=True)
    except KeyError as error:
        raise ValueError('Join column not found in live or new dataframes') from error

    indicator_dataframe = live_dataframe.merge(new_dataframe, on=join_column, how='outer', indicator=True)
    new_only_dataframe = indicator_dataframe[indicator_dataframe['_merge'] == 'right_only']
    if not new_only_dataframe.empty:
        keys_not_found = list(new_only_dataframe.index)
        warnings.warn(
            f'The following keys from the new data were not found in the existing dataset: {keys_not_found}',
            RuntimeWarning
        )

    live_dataframe.update(new_dataframe)
    return (live_dataframe.reset_index().convert_dtypes())