Module palletjack.load
Modify existing ArcGIS Online content (mostly hosted feature services). Contains classes for updating hosted feature service data, modifying the attachments on a hosted feature service, or modifying map symbology.
Expand source code
"""Modify existing ArcGIS Online content (mostly hosted feature services). Contains classes for updating hosted feature
service data, modifying the attachments on a hosted feature service, or modifying map symbology.
"""
import json
import logging
import shutil
import warnings
from datetime import datetime
from pathlib import Path
from typing import Literal
import arcgis
import geopandas as gpd
import numpy as np
import pandas as pd
import pyogrio
from arcgis.features import GeoAccessor, GeoSeriesAccessor # noqa: F401
from palletjack import utils
logger = logging.getLogger(__name__)
class ServiceUpdater:
"""Update an AGOL Feature or Table Service with data from a pandas DataFrame or Spatially-enabled Dataframe.
This class represents the feature layer or table that will be updated and stores a reference to the dataset and it's containing gis. It contains four methods for updating the data: add, remove, update, and truncate_and_load.
It is the client's responsibility to separate out the new data into these different steps. If the extract/transform stages result in separate groups of records that need to be added, deleted, and updated, the client must call the three different methods with dataframes containing only the respective records for each operation.
The method used to upload the data to AGOL saves the updated data as a new layer or table named upload in working_dir/upload.gdb, zips the gdb, uploads it to AGOL (with the item name `gdb_item_prefix Temporary gdb upload`), and then uses this as the source data for a call to the feature layer or tables's .append() method. The geodatabase upload.gdb will be created in working_dir if it doesn't already exist. Ideally, working_dir should be a TemporaryDirectory unless persistent access to the gdb is desired.
"""
def __init__(
self,
gis: arcgis.gis.GIS,
itemid: str,
service_type: Literal["layer", "table"] = "layer",
index: int = 0,
working_dir: Path | None = None,
gdb_item_prefix: str = "palletjack",
) -> None:
"""
Args:
gis (arcgis.gis.GIS): The AGOL organization's gis object
itemid (str): The AGOL item ID of the feature layer or table to update
service_type (Literal["layer", "table"], optional): The type of service to update. Defaults to "layer".
index (int, optional): The index of the layer or table within the item. Defaults to 0.
working_dir (Path, optional): The directory in which to save the gdb for uploading. Defaults to None.
gdb_item_prefix (str, optional): The prefix to use for the gdb item name. Defaults to "palletjack".
"""
self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.gis = gis
if service_type == "table":
self.service = arcgis.features.Table.fromitem(gis.content.get(itemid), table_id=index)
else:
self.service = arcgis.features.FeatureLayer.fromitem(gis.content.get(itemid), layer_id=index)
self.service_type = service_type
self.index = index
self.itemid = itemid
self.working_dir = working_dir
self.gdb_item_prefix = gdb_item_prefix
def add(self, dataframe: pd.DataFrame) -> int:
"""Adds new features/rows to existing hosted feature layer/table from a new dataframe.
If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data.
The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error.
Args:
dataframe (pd.DataFrame): Dataframe of data to be added
Raises:
ValueError: If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data.
Returns:
int: Number of features added
"""
self._class_logger.info(
"Adding items to %s index `%s` in itemid `%s` in-place",
self.service_type,
self.index,
self.itemid,
)
fields = self.__class__._get_fields_from_dataframe(dataframe)
self._class_logger.debug("Using fields %s", fields)
#: Field checks to prevent various AGOL errors
utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=False)
#: Upload
append_count = self._upload_data(
dataframe,
upsert=False,
)
return append_count
def remove(self, delete_oids: list[int]) -> int:
"""Deletes features/rows from a hosted feature layer/table based on list of Object IDs
This is a wrapper around the arcgis.FeatureLayer/Table.delete_features methods that adds some sanity checking. The delete operation is rolled back if any of the features/rows fail to delete using (rollback_on_failure=True). This function will raise a RuntimeError as well after delete() returns if any of them fail.
The sanity checks will raise errors or warnings as appropriate if any of them fail.
Args:
delete_oids (list[int]): List of OIDs to delete
Raises:
ValueError: If delete_string can't be split on `,`
TypeError: If any of the items in delete_string can't be cast to ints
ValueError: If delete_string is empty
UserWarning: If any of the Object IDs in delete_string don't exist in the live data
RuntimeError: If any of the OIDs fail to delete
Returns:
int: The number of features deleted
"""
self._class_logger.info(
"Deleting features from %s index `%s` in itemid `%s`",
self.service_type,
self.index,
self.itemid,
)
self._class_logger.debug("Delete string: %s", delete_oids)
#: Verify delete list
oid_numeric = utils.DeleteUtils.check_delete_oids_are_ints(delete_oids)
utils.DeleteUtils.check_for_empty_oid_list(oid_numeric, delete_oids)
delete_string = ",".join([str(oid) for oid in oid_numeric])
num_missing_oids = utils.DeleteUtils.check_delete_oids_are_in_live_data(
delete_string,
oid_numeric,
self.service,
)
#: Note: apparently not all services support rollback:
#: https://developers.arcgis.com/rest/services-reference/enterprise/delete-features.htm
deletes = utils.retry(
self.service.delete_features,
deletes=delete_string,
rollback_on_failure=True,
)
failed_deletes = [result["objectId"] for result in deletes["deleteResults"] if not result["success"]]
if failed_deletes:
raise RuntimeError(f"The following Object IDs failed to delete: {failed_deletes}")
#: The REST API still returns success: True on missing OIDs, so we have to track this ourselves
actual_delete_count = len(deletes["deleteResults"]) - num_missing_oids
return actual_delete_count
def update(self, dataframe: pd.DataFrame, update_geometry: bool = True) -> int:
"""Updates existing features/rows within a hosted feature layer/table using OBJECTID as the join field.
The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error.
Uses the OBJECTID field to determine which features should be updated by the underlying FeatureLayer/Table.append() methods. The most robust way to do this is to load the live data as a dataframe, subset it down to the desired rows, make your edits based on a separate join id, and then pass that dataframe to this method.
The new data can have either attributes and geometries or only attributes based on the update_geometry flag. A combination of updates from a source with both attributes & geometries and a source with attributes-only must be done with two separate calls. The geometries must be provided in a SHAPE column and be the same type as the live data.
Args:
dataframe (pd.DataFrame): Dataframe of data to be updated
update_geometry (bool): Whether to update attributes and geometry (True) or just attributes (False). Defaults to True for feature layers and is always False for tables.
Raises:
ValueError: If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data.
ValueError: If update_geometry is True for a table
Returns:
int: Number of features updated
"""
self._class_logger.info("Updating layer `%s` in itemid `%s` in-place", self.index, self.itemid)
fields = self.__class__._get_fields_from_dataframe(dataframe)
self._class_logger.debug("Updating fields %s", fields)
#: Field checks to prevent various AGOL errors
utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=True)
#: Upload data
count = self._upload_data(
dataframe,
upsert=True,
upsert_matching_field="OBJECTID",
append_fields=fields, #: Apparently this works if append_fields is all the fields, but not a subset?
update_geometry=update_geometry if self._is_feature_layer() else False,
)
return count
def truncate_and_load(self, dataframe: pd.DataFrame, save_old: bool = False) -> int:
"""Overwrite a hosted feature layer or table by truncating and loading the new data.
When the existing dataset is truncated, a copy is kept in memory as a dataframe. If save_old is set, this is saved as a layer in self.working_dir/backup.gdb with the layer/table name {name}_{todays_date}.json (foobar_2022-12-31.json).
If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data. New OBJECTIDs will be automatically generated.
The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error.
Args:
dataframe (pd.DataFrame): Spatially enabled dataframe of new data to be loaded
save_old (bool): Save existing data to backup.gdb in working_dir. Defaults to False
Returns:
int: Number of features loaded
"""
self._class_logger.info(
"Truncating and loading %s index `%s` in itemid `%s`",
self.service_type,
self.index,
self.itemid,
)
start = datetime.now()
#: Save the data to disk if desired
if save_old:
self._class_logger.info("Saving existing data to %s", self.working_dir)
saved_layer_path = utils.save_to_gdb(self.service, self.working_dir)
fields = self.__class__._get_fields_from_dataframe(dataframe)
#: Field checks to prevent various AGOL errors
utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=False)
self._class_logger.info("Truncating existing data...")
self._truncate_existing_data()
try:
self._class_logger.info("Loading new data...")
append_count = self._upload_data(dataframe, upsert=False)
self._class_logger.debug("Total truncate and load time: %s", datetime.now() - start)
except Exception:
if save_old:
self._class_logger.error("Append failed. Data saved to %s", saved_layer_path)
raise
self._class_logger.error("Append failed. Old data not saved (save_old set to False)")
raise
return append_count
@staticmethod
def _get_fields_from_dataframe(dataframe: pd.DataFrame) -> list[str]:
"""Get the fields from a dataframe, excluding Shape_Area and Shape_Length
Args:
dataframe (pd.DataFrame): Dataframe to get fields from
Returns:
list[str]: List of the columns of the dataframe, excluding Shape_Area and Shape_Length
"""
fields = list(dataframe.columns)
for auto_gen_field in ["Shape_Area", "Shape_Length"]:
try:
fields.remove(auto_gen_field)
except ValueError:
continue
return fields
def _upload_data(self, dataframe: pd.DataFrame, **append_kwargs) -> int:
"""Append a dataframe to a feature layer or table by uploading it as a zipped file gdb.
We first save the new dataframe as a layer or table in an empty geodatabase, then zip it and upload it to AGOL as a standalone item. We then call append on the target feature layer or table with this item as the source for the append, using upsert where appropriate to update existing data using OBJECTID as the join field. Afterwards, we delete the gdb item and the zipped gdb.
Args:
dataframe (pd.DataFrame): A dataframe containing data to be added or upserted to the feature layer or table. The fields must match the live fields in name, type, and length (where applicable). For feature layers, the dataframe must have a SHAPE column containing geometries of the same type as the live data.
**append_kwargs: Additional keyword arguments to pass to the append operation.
Raises:
ValueError: If the field used as a key for upsert matching is not present in either the new or live data.
RuntimeError: If the append operation fails.
Returns:
int: The number of records upserted.
"""
try:
if append_kwargs["upsert"] and (
append_kwargs["upsert_matching_field"] not in append_kwargs["append_fields"]
or append_kwargs["upsert_matching_field"] not in dataframe.columns
):
raise ValueError(
f'Upsert matching field {append_kwargs["upsert_matching_field"]} not found in either append fields or existing fields.'
)
except KeyError:
pass
self._class_logger.debug("Saving data to gdb and zipping...")
zipped_gdb_path = self._save_to_gdb_and_zip(dataframe)
self._class_logger.debug("Uploading gdb to AGOL...")
gdb_item = self._upload_gdb(zipped_gdb_path)
self._class_logger.debug("Appending data from gdb to target...")
try:
result, messages = utils.retry(
self.service.append,
item_id=gdb_item.id,
upload_format="filegdb",
source_table_name="upload",
return_messages=True,
rollback=True,
**append_kwargs,
)
if not result:
raise RuntimeError("Append failed but did not error")
except Exception as error:
raise RuntimeError("Failed to append data from gdb, changes should have been rolled back") from error
self._cleanup(gdb_item, zipped_gdb_path)
return messages["recordCount"]
def _save_to_gdb_and_zip(self, dataframe: pd.DataFrame) -> Path:
"""Save a dataframe to a gdb feature class or table, zip it, and return path to the zipped file.
Requires self.working_dir to be set. Uses pyogrio to save the dataframe to the gdb, then uses shutil.make_archive to zip the gdb. The zipped gdb is saved in self.working_dir.
Args:
dataframe (pd.DataFrame): The input dataframe to be saved.
Raises:
ValueError: If self.working_dir is not set or the empty upload.gdb doesn't exist in it.
Returns:
Path: The path to the zipped GDB.
"""
try:
gdb_path = Path(self.working_dir) / "upload.gdb"
except TypeError as error:
raise AttributeError(f"working_dir not specified on {self.__class__.__name__}") from error
try:
#: check if the dataframe is a spatially enabled dataframe
dataframe.spatial.geometry_type # raises KeyError if this is a regular dataframe
gdf = utils.sedf_to_gdf(dataframe)
except KeyError:
gdf = gpd.GeoDataFrame(dataframe)
try:
gdf.to_file(gdb_path, layer="upload", engine="pyogrio", driver="OpenFileGDB")
except pyogrio.errors.DataSourceError as error:
raise ValueError(
f"Error writing layer to {gdb_path}. Verify {self.working_dir} exists and is writable."
) from error
try:
zipped_gdb_path = shutil.make_archive(gdb_path, "zip", root_dir=gdb_path.parent, base_dir=gdb_path.name)
except OSError as error:
raise ValueError(f"Error zipping {gdb_path}") from error
return zipped_gdb_path
def _upload_gdb(self, zipped_gdb_path: Path) -> arcgis.gis.Item:
"""Add a zipped gdb to AGOL as an item to self.gis
Args:
zipped_gdb_path (Path): Path to the zipped gdb
Raises:
RuntimeError: If there is an error uploading the gdb to AGOL
Returns:
arcgis.gis.Item: Reference to the resulting Item object in self.gis
"""
try:
gdb_item = utils.retry(
self.gis.content.add,
item_properties={
"type": "File Geodatabase",
"title": f"{self.gdb_item_prefix} Temporary gdb upload",
"snippet": "Temporary gdb upload from palletjack",
},
data=zipped_gdb_path,
)
except Exception as error:
raise RuntimeError(f"Error uploading {zipped_gdb_path} to AGOL") from error
return gdb_item
def _cleanup(self, gdb_item: arcgis.gis.Item, zipped_gdb_path: Path) -> None:
"""Remove the zipped gdb from disk and the gdb item from AGOL
Args:
gdb_item (arcgis.gis.Item): Reference to the gdb item in self.gis
zipped_gdb_path (Path): Path to the gdb on disk
Raises:
RuntimeError: If there are errors deleting the gdb item or the zipped gdb
"""
try:
gdb_item.delete()
except Exception as error:
warnings.warn(f"Error deleting gdb item {gdb_item.id} from AGOL")
warnings.warn(repr(error))
try:
zipped_gdb_path.unlink()
except Exception as error:
warnings.warn(f"Error deleting zipped gdb {zipped_gdb_path}")
warnings.warn(repr(error))
def _truncate_existing_data(self) -> None:
"""Remove all existing features from the live dataset
Raises:
RuntimeError: If the truncate fails
"""
self._class_logger.debug("Truncating...")
truncate_result = utils.retry(
self.service.manager.truncate,
asynchronous=True,
wait=True,
)
self._class_logger.debug(truncate_result)
if truncate_result["status"] != "Completed":
raise RuntimeError(f"Failed to truncate existing data in itemid {self.itemid}")
def _is_feature_layer(self) -> bool:
"""Help function to determine if we are dealing with a feature layer as opposed to a table
Returns:
bool: True if the service is a feature layer, False if it is a table
"""
return self.service_type == "layer"
class FeatureServiceAttachmentsUpdater:
"""Add or overwrite attachments in a feature service using a dataframe of the desired "new" attachments.
Updates the attachments based on a dataframe containing two columns: a join key present in the live data (the
dataframe column name must match the feature service field name) and the path of the file to attach to the feature.
While AGOL supports multiple attachments, this only accepts a single file as input.
If a matching feature in AGOl doesn't have an attachment, the file referred to by the dataframe will be uploaded.
If it does have an attachment, it checks the existing filename with the referenced file. If they are different, the
file from the dataframe will be updated. If they are the same, nothing happens.
"""
def __init__(self, gis):
"""
Args:
gis (arcgis.gis.GIS): The AGOL organization's gis object
"""
self.gis = gis
self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.failed_dict = {}
def _get_live_oid_and_guid_from_join_field_values(self, live_features_as_df, attachment_join_field, attachments_df):
"""Get the live Object ID and guid from the live features for only the features in attachments_df
Args:
live_features_as_df (pd.DataFrame): Spatial dataframe of all the feature layer's live data from AGOL
attachment_join_field (str): Column in attachments_df to use as a join key with live data
attachments_df (pd.DataFrame): New attachment data, including the join key and a path to the "new"
attachment
Returns:
pd.DataFrame: Attachments dataframe with corresponding live OIDs and GUIDs.
"""
self._class_logger.debug("Using %s as the join field between live and new data", attachment_join_field)
subset_df = live_features_as_df.reindex(columns=["OBJECTID", "GlobalID", attachment_join_field])
merged_df = subset_df.merge(attachments_df, on=attachment_join_field, how="inner")
self._class_logger.debug("%s features common to both live and new data", len(merged_df.index))
return merged_df
def _get_current_attachment_info_by_oid(self, live_data_subset_df):
"""Merge the live attachment data using the ObjectID as the join
Args:
live_data_subset_df (pd.DataFrame): Live data with 'OBJECTID', 'GlobalID', and new attachment data
Returns:
pd.DataFrame: Live and new attachment data in one dataframe
"""
live_attachments_df = pd.DataFrame(self.feature_layer.attachments.search())
live_attachments_subset_df = live_attachments_df.reindex(columns=["PARENTOBJECTID", "NAME", "ID"])
merged_df = live_data_subset_df.merge(
live_attachments_subset_df, left_on="OBJECTID", right_on="PARENTOBJECTID", how="left"
)
#: Cast ID field to nullable int to avoid conversion to float for np.nans
merged_df["ID"] = merged_df["ID"].astype("Int64")
return merged_df
def _create_attachment_action_df(self, attachment_eval_df, attachment_path_field):
"""Create a dataframe containing the action needed for each feature resulting from the attachment join.
If the live feature doesn't have an attachment, add the attachment. If it does, compare the file names and only
attach if they are different. Otherwise, leave null.
Args:
attachment_eval_df (pd.DataFrame): DataFrame of live attachment data, subsetted to features that matched
the join key in the new attachments
attachment_path_field (str): The column that holds the attachment path
Returns:
pd.DataFrame: attachment_eval_df with 'operation' and 'new_filename' columns added
"""
#: Get the file name from the full path
attachment_eval_df["new_filename"] = attachment_eval_df[attachment_path_field].apply(
lambda path: Path(path).name
)
#: Overwrite if different names, add if no existing name, do nothing if names are the same
attachment_eval_df["operation"] = np.nan
attachment_eval_df.loc[attachment_eval_df["NAME"] != attachment_eval_df["new_filename"], "operation"] = (
"overwrite"
)
attachment_eval_df.loc[attachment_eval_df["NAME"].isna(), "operation"] = "add"
value_counts = attachment_eval_df["operation"].value_counts(dropna=False)
for operation in ["add", "overwrite", np.nan]:
if operation not in value_counts:
value_counts[operation] = 0
self._class_logger.debug(
"Calculated attachment operations: adds: %s, overwrites: %s, none: %s",
value_counts["add"],
value_counts["overwrite"],
value_counts[np.nan],
)
return attachment_eval_df
def _add_attachments_by_oid(self, attachment_action_df, attachment_path_field):
"""Add attachments using the feature's OID based on the 'operation' field of the dataframe
Args:
attachment_action_df (pd.DataFrame): A dataframe containing 'operation', 'OBJECTID', and
attachment_path_field columns
attachment_path_field (str): The column that holds the attachment path
Returns:
int: The number of features that successfully have attachments added.
"""
adds_dict = attachment_action_df[attachment_action_df["operation"] == "add"].to_dict(orient="index")
adds_count = 0
for row in adds_dict.values():
target_oid = row["OBJECTID"]
filepath = row[attachment_path_field]
self._class_logger.debug("Add %s to OID %s", filepath, target_oid)
try:
result = self.feature_layer.attachments.add(target_oid, filepath)
except Exception:
self._class_logger.error("AGOL error while adding %s to OID %s", filepath, target_oid, exc_info=True)
self.failed_dict[target_oid] = ("add", filepath)
continue
self._class_logger.debug("%s", result)
if not result["addAttachmentResult"]["success"]:
warnings.warn(f"Failed to attach {filepath} to OID {target_oid}")
self.failed_dict[target_oid] = ("add", filepath)
continue
adds_count += 1
return adds_count
def _overwrite_attachments_by_oid(self, attachment_action_df, attachment_path_field):
"""Overwrite attachments using the feature's OID based on the 'operation' field of the dataframe
Args:
attachment_action_df (pd.DataFrame): A dataframe containing 'operation', 'OBJECTID', 'ID', 'NAME', and
attachment_path_field columns
attachment_path_field (str): The column that holds the attachment path
Returns:
int: The number of features that successfully have their attachments overwritten.
"""
overwrites_dict = attachment_action_df[attachment_action_df["operation"] == "overwrite"].to_dict(orient="index")
overwrites_count = 0
for row in overwrites_dict.values():
target_oid = row["OBJECTID"]
filepath = row[attachment_path_field]
attachment_id = row["ID"]
old_name = row["NAME"]
self._class_logger.debug(
"Overwriting %s (attachment ID %s) on OID %s with %s", old_name, attachment_id, target_oid, filepath
)
try:
result = self.feature_layer.attachments.update(target_oid, attachment_id, filepath)
except Exception:
self._class_logger.error(
"AGOL error while overwriting %s (attachment ID %s) on OID %s with %s",
old_name,
attachment_id,
target_oid,
filepath,
exc_info=True,
)
self.failed_dict[target_oid] = ("update", filepath)
continue
if not result["updateAttachmentResult"]["success"]:
warnings.warn(
f"Failed to update {old_name}, attachment ID {attachment_id}, on OID {target_oid} with {filepath}"
)
self.failed_dict[target_oid] = ("update", filepath)
continue
overwrites_count += 1
return overwrites_count
@staticmethod
def _check_attachment_dataframe_for_invalid_column_names(attachment_dataframe, invalid_names):
invalid_names_index = pd.Index(invalid_names)
intersection = attachment_dataframe.columns.intersection(invalid_names_index)
if not intersection.empty:
raise RuntimeError(f"Attachment dataframe contains the following invalid names: {list(intersection)}")
def update_attachments(
self, feature_layer_itemid, attachment_join_field, attachment_path_field, attachments_df, layer_number=0
):
"""Update a feature layer's attachments based on info from a dataframe of desired attachment file names
Depends on a dataframe populated with a join key for the live data and the downloaded or locally-available
attachments. If the name of the "new" attachment is the same as an existing attachment for that feature, it is
not updated. If it is different or there isn't an existing attachment, the "new" attachment is attached to that
feature.
Args:
feature_layer_itemid (str): The AGOL Item ID of the feature layer to update
attachment_join_field (str): The field containing the join key between the attachments dataframe and the
live data
attachment_path_field (str): The field containing the desired attachment file path
attachments_df (pd.DataFrame): A dataframe of desired attachments, including a join key and the local path
to the attachment
layer_number (int, optional): The layer within the Item ID to update. Defaults to 0.
Returns:
(int, int): Tuple of counts of successful overwrites and adds.
"""
self._class_logger.info("Updating attachments...")
#: These names are present in the live attachment data downloaded from AGOL. Because we merge the dataframes
#: later, we need to make sure they're not the same. There may be better ways of handling this that allows the
#: client names to be preserved, but for now force them to fix this.
self._check_attachment_dataframe_for_invalid_column_names(
attachments_df, invalid_names=["OBJECTID", "PARENTOBJECTID", "NAME", "ID"]
)
self._class_logger.debug("Using layer %s from item ID %s", layer_number, feature_layer_itemid)
self.feature_layer = self.gis.content.get(feature_layer_itemid).layers[layer_number]
live_features_as_df = pd.DataFrame.spatial.from_layer(self.feature_layer)
live_data_subset_df = self._get_live_oid_and_guid_from_join_field_values(
live_features_as_df, attachment_join_field, attachments_df
)
#: TODO: Make sure layer supports attachments so we don't get an arcgis error.
#: Check out the feature layer .properties and FeatureLayerManager.add_to_definition to check/enable?
attachment_eval_df = self._get_current_attachment_info_by_oid(live_data_subset_df)
attachment_action_df = self._create_attachment_action_df(attachment_eval_df, attachment_path_field)
overwrites_count = self._overwrite_attachments_by_oid(attachment_action_df, attachment_path_field)
adds_count = self._add_attachments_by_oid(attachment_action_df, attachment_path_field)
self._class_logger.info("%s attachments added, %s attachments overwritten", adds_count, overwrites_count)
return overwrites_count, adds_count
@staticmethod
def build_attachments_dataframe(input_dataframe, join_column, attachment_column, out_dir):
"""Create an attachments dataframe by subsetting down to just the two fields and dropping any rows
with null/empty attachments
Args:
input_dataframe (pd.DataFrame): Input data containing at least the join and attachment filename columns
join_column (str): Unique key joining attachments to live data
attachment_column (str): Filename for each attachment
out_dir (str or Path): Output directory, will be used to build full path to attachment
Returns:
pd.DataFrame: Dataframe with join key, attachment name, and full attachment paths
"""
input_dataframe[attachment_column].replace("", np.nan, inplace=True) #: pandas doesn't see empty strings as NAs
attachments_dataframe = (
input_dataframe[[join_column, attachment_column]].copy().dropna(subset=[attachment_column])
)
#: Create the full path by prepending the output directory using .apply and a lambda function
attachments_dataframe["full_file_path"] = attachments_dataframe[attachment_column].apply(
lambda filename: str(Path(out_dir, filename))
)
return attachments_dataframe
class ColorRampReclassifier:
"""Updates the interval ranges on a webmap's layer's classification renderer based on the layer's current data.
Manually edits the JSON definition to change a layer's color ramp values based on a simple unclassed scheme similar
to AGOL's unclassed ramp. The minimum value is the dataset minimum, the max is the mean value plus one standard
deviation.
"""
def __init__(self, webmap_item, gis):
"""
Args:
webmap_item (arcgis.mapping.WebMap): The webmap item in the AGOL organization
gis (arcgis.gis.GIS): The AGOL organization as a gis object
"""
self.webmap_item = webmap_item
self.gis = gis
self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__)
def _get_layer_dataframe(self, layer_name, feature_layer_number=0):
"""Create a dataframe from layer_name in self.webmap_item
Args:
layer_name (str): The exact name of the layer
feature_layer_number (int): The number of the layer with the feature service to update. Defaults to 0.
Returns:
spatially-enabled data frame: The layer's data, including geometries.
"""
self._class_logger.info("Getting dataframe from `%s` on `%s`", layer_name, self.webmap_item.title)
webmap_object = arcgis.mapping.WebMap(self.webmap_item)
layer = webmap_object.get_layer(title=layer_name)
feature_layer = self.gis.content.get(layer["itemId"])
layer_dataframe = pd.DataFrame.spatial.from_layer(feature_layer.layers[feature_layer_number])
return layer_dataframe
def _get_layer_id(self, layer_name):
"""Get the ID number of layer_name in self.webmap_item
Args:
layer_name (str): The exact name of the layer
Raises:
ValueError: If the layer is not found in the webmap
Returns:
int: The index (0-based) of the the layer in the web map
"""
data = self.webmap_item.get_data()
for layer_id, layer in enumerate(data["operationalLayers"]):
if layer["title"] == layer_name:
self._class_logger.debug("Layer `%s` has id `%s`", layer_name, layer_id)
return layer_id
#: If we haven't matched the title and returned a valid id, raise an error.
raise ValueError(f'Could not find "{layer_name}" in {self.webmap_item.title}')
@staticmethod
def _calculate_new_stops(dataframe, column, stops):
"""Calculate new stop values for an AGOL color ramp using what appears to be AGOL's method for unclassed ramps.
Args:
dataframe (pd.DataFrame): Data being classified
column (str): Column to classify
stops (int, optional): Number of stops to create.
Returns:
List: New stops cast as ints
"""
if column not in dataframe.columns:
raise ValueError(f"Column `{column}` not in dataframe")
minval = dataframe[column].min()
mean = dataframe[column].mean()
std_dev = dataframe[column].std()
upper = mean + std_dev #: AGOL's default upper value for unclassed ramps seems to be mean + 1 std dev
new_stops = np.linspace(minval, upper, stops)
new_stops_ints = [int(stop) for stop in new_stops]
return new_stops_ints
def _update_stop_values(self, layer_number, new_stops):
"""Update the stop values of an (un)classified polygon renderer in an AGOL Webmap
Args:
layer_number (int): The index for the layer to be updated
new_stops (List): New values for the existing stops
Returns:
Bool: Success or failure of update operation
"""
#: Get short reference to the stops dictionary from the webmap's data json
data = self.webmap_item.get_data()
renderer = data["operationalLayers"][layer_number]["layerDefinition"]["drawingInfo"]["renderer"]
stops = renderer["visualVariables"][0]["stops"]
#: Overwrite the value, update the webmap item
for stop, new_value in zip(stops, new_stops):
stop["value"] = new_value
self._class_logger.info(
"Updating stop values on layer number `%s` in `%s`", layer_number, self.webmap_item.title
)
result = self.webmap_item.update(item_properties={"text": json.dumps(data)})
self._class_logger.debug("Update result: %s", result)
return result
def update_color_ramp_values(self, layer_name, column_name, stops=5):
"""Update the color ramp ranges for layer_name in self.webmap_item.
Does not alter colors or introduce additional stops; only overwrites the values for existing breaks.
Args:
layer_name (str): The exact name of the layer to be updated
column_name (str): The name of the attribute being displayed as an (un)classified range
stops (int, optional): The number of stops to calculate. Must match existing stops. Defaults to 5.
Returns:
Bool: Success or failure of update operation
"""
layer_id = self._get_layer_id(layer_name)
dataframe = self._get_layer_dataframe(layer_name)
new_stops = self._calculate_new_stops(dataframe, column_name, stops)
result = self._update_stop_values(layer_id, new_stops)
return result
Classes
class ColorRampReclassifier (webmap_item, gis)
-
Updates the interval ranges on a webmap's layer's classification renderer based on the layer's current data.
Manually edits the JSON definition to change a layer's color ramp values based on a simple unclassed scheme similar to AGOL's unclassed ramp. The minimum value is the dataset minimum, the max is the mean value plus one standard deviation.
Args
webmap_item
:arcgis.mapping.WebMap
- The webmap item in the AGOL organization
gis
:arcgis.gis.GIS
- The AGOL organization as a gis object
Expand source code
class ColorRampReclassifier: """Updates the interval ranges on a webmap's layer's classification renderer based on the layer's current data. Manually edits the JSON definition to change a layer's color ramp values based on a simple unclassed scheme similar to AGOL's unclassed ramp. The minimum value is the dataset minimum, the max is the mean value plus one standard deviation. """ def __init__(self, webmap_item, gis): """ Args: webmap_item (arcgis.mapping.WebMap): The webmap item in the AGOL organization gis (arcgis.gis.GIS): The AGOL organization as a gis object """ self.webmap_item = webmap_item self.gis = gis self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__) def _get_layer_dataframe(self, layer_name, feature_layer_number=0): """Create a dataframe from layer_name in self.webmap_item Args: layer_name (str): The exact name of the layer feature_layer_number (int): The number of the layer with the feature service to update. Defaults to 0. Returns: spatially-enabled data frame: The layer's data, including geometries. """ self._class_logger.info("Getting dataframe from `%s` on `%s`", layer_name, self.webmap_item.title) webmap_object = arcgis.mapping.WebMap(self.webmap_item) layer = webmap_object.get_layer(title=layer_name) feature_layer = self.gis.content.get(layer["itemId"]) layer_dataframe = pd.DataFrame.spatial.from_layer(feature_layer.layers[feature_layer_number]) return layer_dataframe def _get_layer_id(self, layer_name): """Get the ID number of layer_name in self.webmap_item Args: layer_name (str): The exact name of the layer Raises: ValueError: If the layer is not found in the webmap Returns: int: The index (0-based) of the the layer in the web map """ data = self.webmap_item.get_data() for layer_id, layer in enumerate(data["operationalLayers"]): if layer["title"] == layer_name: self._class_logger.debug("Layer `%s` has id `%s`", layer_name, layer_id) return layer_id #: If we haven't matched the title and returned a valid id, raise an error. raise ValueError(f'Could not find "{layer_name}" in {self.webmap_item.title}') @staticmethod def _calculate_new_stops(dataframe, column, stops): """Calculate new stop values for an AGOL color ramp using what appears to be AGOL's method for unclassed ramps. Args: dataframe (pd.DataFrame): Data being classified column (str): Column to classify stops (int, optional): Number of stops to create. Returns: List: New stops cast as ints """ if column not in dataframe.columns: raise ValueError(f"Column `{column}` not in dataframe") minval = dataframe[column].min() mean = dataframe[column].mean() std_dev = dataframe[column].std() upper = mean + std_dev #: AGOL's default upper value for unclassed ramps seems to be mean + 1 std dev new_stops = np.linspace(minval, upper, stops) new_stops_ints = [int(stop) for stop in new_stops] return new_stops_ints def _update_stop_values(self, layer_number, new_stops): """Update the stop values of an (un)classified polygon renderer in an AGOL Webmap Args: layer_number (int): The index for the layer to be updated new_stops (List): New values for the existing stops Returns: Bool: Success or failure of update operation """ #: Get short reference to the stops dictionary from the webmap's data json data = self.webmap_item.get_data() renderer = data["operationalLayers"][layer_number]["layerDefinition"]["drawingInfo"]["renderer"] stops = renderer["visualVariables"][0]["stops"] #: Overwrite the value, update the webmap item for stop, new_value in zip(stops, new_stops): stop["value"] = new_value self._class_logger.info( "Updating stop values on layer number `%s` in `%s`", layer_number, self.webmap_item.title ) result = self.webmap_item.update(item_properties={"text": json.dumps(data)}) self._class_logger.debug("Update result: %s", result) return result def update_color_ramp_values(self, layer_name, column_name, stops=5): """Update the color ramp ranges for layer_name in self.webmap_item. Does not alter colors or introduce additional stops; only overwrites the values for existing breaks. Args: layer_name (str): The exact name of the layer to be updated column_name (str): The name of the attribute being displayed as an (un)classified range stops (int, optional): The number of stops to calculate. Must match existing stops. Defaults to 5. Returns: Bool: Success or failure of update operation """ layer_id = self._get_layer_id(layer_name) dataframe = self._get_layer_dataframe(layer_name) new_stops = self._calculate_new_stops(dataframe, column_name, stops) result = self._update_stop_values(layer_id, new_stops) return result
Methods
def update_color_ramp_values(self, layer_name, column_name, stops=5)
-
Update the color ramp ranges for layer_name in self.webmap_item.
Does not alter colors or introduce additional stops; only overwrites the values for existing breaks.
Args
layer_name
:str
- The exact name of the layer to be updated
column_name
:str
- The name of the attribute being displayed as an (un)classified range
stops
:int
, optional- The number of stops to calculate. Must match existing stops. Defaults to 5.
Returns
Bool
- Success or failure of update operation
Expand source code
def update_color_ramp_values(self, layer_name, column_name, stops=5): """Update the color ramp ranges for layer_name in self.webmap_item. Does not alter colors or introduce additional stops; only overwrites the values for existing breaks. Args: layer_name (str): The exact name of the layer to be updated column_name (str): The name of the attribute being displayed as an (un)classified range stops (int, optional): The number of stops to calculate. Must match existing stops. Defaults to 5. Returns: Bool: Success or failure of update operation """ layer_id = self._get_layer_id(layer_name) dataframe = self._get_layer_dataframe(layer_name) new_stops = self._calculate_new_stops(dataframe, column_name, stops) result = self._update_stop_values(layer_id, new_stops) return result
class FeatureServiceAttachmentsUpdater (gis)
-
Add or overwrite attachments in a feature service using a dataframe of the desired "new" attachments.
Updates the attachments based on a dataframe containing two columns: a join key present in the live data (the dataframe column name must match the feature service field name) and the path of the file to attach to the feature. While AGOL supports multiple attachments, this only accepts a single file as input.
If a matching feature in AGOl doesn't have an attachment, the file referred to by the dataframe will be uploaded. If it does have an attachment, it checks the existing filename with the referenced file. If they are different, the file from the dataframe will be updated. If they are the same, nothing happens.
Args
gis
:arcgis.gis.GIS
- The AGOL organization's gis object
Expand source code
class FeatureServiceAttachmentsUpdater: """Add or overwrite attachments in a feature service using a dataframe of the desired "new" attachments. Updates the attachments based on a dataframe containing two columns: a join key present in the live data (the dataframe column name must match the feature service field name) and the path of the file to attach to the feature. While AGOL supports multiple attachments, this only accepts a single file as input. If a matching feature in AGOl doesn't have an attachment, the file referred to by the dataframe will be uploaded. If it does have an attachment, it checks the existing filename with the referenced file. If they are different, the file from the dataframe will be updated. If they are the same, nothing happens. """ def __init__(self, gis): """ Args: gis (arcgis.gis.GIS): The AGOL organization's gis object """ self.gis = gis self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__) self.failed_dict = {} def _get_live_oid_and_guid_from_join_field_values(self, live_features_as_df, attachment_join_field, attachments_df): """Get the live Object ID and guid from the live features for only the features in attachments_df Args: live_features_as_df (pd.DataFrame): Spatial dataframe of all the feature layer's live data from AGOL attachment_join_field (str): Column in attachments_df to use as a join key with live data attachments_df (pd.DataFrame): New attachment data, including the join key and a path to the "new" attachment Returns: pd.DataFrame: Attachments dataframe with corresponding live OIDs and GUIDs. """ self._class_logger.debug("Using %s as the join field between live and new data", attachment_join_field) subset_df = live_features_as_df.reindex(columns=["OBJECTID", "GlobalID", attachment_join_field]) merged_df = subset_df.merge(attachments_df, on=attachment_join_field, how="inner") self._class_logger.debug("%s features common to both live and new data", len(merged_df.index)) return merged_df def _get_current_attachment_info_by_oid(self, live_data_subset_df): """Merge the live attachment data using the ObjectID as the join Args: live_data_subset_df (pd.DataFrame): Live data with 'OBJECTID', 'GlobalID', and new attachment data Returns: pd.DataFrame: Live and new attachment data in one dataframe """ live_attachments_df = pd.DataFrame(self.feature_layer.attachments.search()) live_attachments_subset_df = live_attachments_df.reindex(columns=["PARENTOBJECTID", "NAME", "ID"]) merged_df = live_data_subset_df.merge( live_attachments_subset_df, left_on="OBJECTID", right_on="PARENTOBJECTID", how="left" ) #: Cast ID field to nullable int to avoid conversion to float for np.nans merged_df["ID"] = merged_df["ID"].astype("Int64") return merged_df def _create_attachment_action_df(self, attachment_eval_df, attachment_path_field): """Create a dataframe containing the action needed for each feature resulting from the attachment join. If the live feature doesn't have an attachment, add the attachment. If it does, compare the file names and only attach if they are different. Otherwise, leave null. Args: attachment_eval_df (pd.DataFrame): DataFrame of live attachment data, subsetted to features that matched the join key in the new attachments attachment_path_field (str): The column that holds the attachment path Returns: pd.DataFrame: attachment_eval_df with 'operation' and 'new_filename' columns added """ #: Get the file name from the full path attachment_eval_df["new_filename"] = attachment_eval_df[attachment_path_field].apply( lambda path: Path(path).name ) #: Overwrite if different names, add if no existing name, do nothing if names are the same attachment_eval_df["operation"] = np.nan attachment_eval_df.loc[attachment_eval_df["NAME"] != attachment_eval_df["new_filename"], "operation"] = ( "overwrite" ) attachment_eval_df.loc[attachment_eval_df["NAME"].isna(), "operation"] = "add" value_counts = attachment_eval_df["operation"].value_counts(dropna=False) for operation in ["add", "overwrite", np.nan]: if operation not in value_counts: value_counts[operation] = 0 self._class_logger.debug( "Calculated attachment operations: adds: %s, overwrites: %s, none: %s", value_counts["add"], value_counts["overwrite"], value_counts[np.nan], ) return attachment_eval_df def _add_attachments_by_oid(self, attachment_action_df, attachment_path_field): """Add attachments using the feature's OID based on the 'operation' field of the dataframe Args: attachment_action_df (pd.DataFrame): A dataframe containing 'operation', 'OBJECTID', and attachment_path_field columns attachment_path_field (str): The column that holds the attachment path Returns: int: The number of features that successfully have attachments added. """ adds_dict = attachment_action_df[attachment_action_df["operation"] == "add"].to_dict(orient="index") adds_count = 0 for row in adds_dict.values(): target_oid = row["OBJECTID"] filepath = row[attachment_path_field] self._class_logger.debug("Add %s to OID %s", filepath, target_oid) try: result = self.feature_layer.attachments.add(target_oid, filepath) except Exception: self._class_logger.error("AGOL error while adding %s to OID %s", filepath, target_oid, exc_info=True) self.failed_dict[target_oid] = ("add", filepath) continue self._class_logger.debug("%s", result) if not result["addAttachmentResult"]["success"]: warnings.warn(f"Failed to attach {filepath} to OID {target_oid}") self.failed_dict[target_oid] = ("add", filepath) continue adds_count += 1 return adds_count def _overwrite_attachments_by_oid(self, attachment_action_df, attachment_path_field): """Overwrite attachments using the feature's OID based on the 'operation' field of the dataframe Args: attachment_action_df (pd.DataFrame): A dataframe containing 'operation', 'OBJECTID', 'ID', 'NAME', and attachment_path_field columns attachment_path_field (str): The column that holds the attachment path Returns: int: The number of features that successfully have their attachments overwritten. """ overwrites_dict = attachment_action_df[attachment_action_df["operation"] == "overwrite"].to_dict(orient="index") overwrites_count = 0 for row in overwrites_dict.values(): target_oid = row["OBJECTID"] filepath = row[attachment_path_field] attachment_id = row["ID"] old_name = row["NAME"] self._class_logger.debug( "Overwriting %s (attachment ID %s) on OID %s with %s", old_name, attachment_id, target_oid, filepath ) try: result = self.feature_layer.attachments.update(target_oid, attachment_id, filepath) except Exception: self._class_logger.error( "AGOL error while overwriting %s (attachment ID %s) on OID %s with %s", old_name, attachment_id, target_oid, filepath, exc_info=True, ) self.failed_dict[target_oid] = ("update", filepath) continue if not result["updateAttachmentResult"]["success"]: warnings.warn( f"Failed to update {old_name}, attachment ID {attachment_id}, on OID {target_oid} with {filepath}" ) self.failed_dict[target_oid] = ("update", filepath) continue overwrites_count += 1 return overwrites_count @staticmethod def _check_attachment_dataframe_for_invalid_column_names(attachment_dataframe, invalid_names): invalid_names_index = pd.Index(invalid_names) intersection = attachment_dataframe.columns.intersection(invalid_names_index) if not intersection.empty: raise RuntimeError(f"Attachment dataframe contains the following invalid names: {list(intersection)}") def update_attachments( self, feature_layer_itemid, attachment_join_field, attachment_path_field, attachments_df, layer_number=0 ): """Update a feature layer's attachments based on info from a dataframe of desired attachment file names Depends on a dataframe populated with a join key for the live data and the downloaded or locally-available attachments. If the name of the "new" attachment is the same as an existing attachment for that feature, it is not updated. If it is different or there isn't an existing attachment, the "new" attachment is attached to that feature. Args: feature_layer_itemid (str): The AGOL Item ID of the feature layer to update attachment_join_field (str): The field containing the join key between the attachments dataframe and the live data attachment_path_field (str): The field containing the desired attachment file path attachments_df (pd.DataFrame): A dataframe of desired attachments, including a join key and the local path to the attachment layer_number (int, optional): The layer within the Item ID to update. Defaults to 0. Returns: (int, int): Tuple of counts of successful overwrites and adds. """ self._class_logger.info("Updating attachments...") #: These names are present in the live attachment data downloaded from AGOL. Because we merge the dataframes #: later, we need to make sure they're not the same. There may be better ways of handling this that allows the #: client names to be preserved, but for now force them to fix this. self._check_attachment_dataframe_for_invalid_column_names( attachments_df, invalid_names=["OBJECTID", "PARENTOBJECTID", "NAME", "ID"] ) self._class_logger.debug("Using layer %s from item ID %s", layer_number, feature_layer_itemid) self.feature_layer = self.gis.content.get(feature_layer_itemid).layers[layer_number] live_features_as_df = pd.DataFrame.spatial.from_layer(self.feature_layer) live_data_subset_df = self._get_live_oid_and_guid_from_join_field_values( live_features_as_df, attachment_join_field, attachments_df ) #: TODO: Make sure layer supports attachments so we don't get an arcgis error. #: Check out the feature layer .properties and FeatureLayerManager.add_to_definition to check/enable? attachment_eval_df = self._get_current_attachment_info_by_oid(live_data_subset_df) attachment_action_df = self._create_attachment_action_df(attachment_eval_df, attachment_path_field) overwrites_count = self._overwrite_attachments_by_oid(attachment_action_df, attachment_path_field) adds_count = self._add_attachments_by_oid(attachment_action_df, attachment_path_field) self._class_logger.info("%s attachments added, %s attachments overwritten", adds_count, overwrites_count) return overwrites_count, adds_count @staticmethod def build_attachments_dataframe(input_dataframe, join_column, attachment_column, out_dir): """Create an attachments dataframe by subsetting down to just the two fields and dropping any rows with null/empty attachments Args: input_dataframe (pd.DataFrame): Input data containing at least the join and attachment filename columns join_column (str): Unique key joining attachments to live data attachment_column (str): Filename for each attachment out_dir (str or Path): Output directory, will be used to build full path to attachment Returns: pd.DataFrame: Dataframe with join key, attachment name, and full attachment paths """ input_dataframe[attachment_column].replace("", np.nan, inplace=True) #: pandas doesn't see empty strings as NAs attachments_dataframe = ( input_dataframe[[join_column, attachment_column]].copy().dropna(subset=[attachment_column]) ) #: Create the full path by prepending the output directory using .apply and a lambda function attachments_dataframe["full_file_path"] = attachments_dataframe[attachment_column].apply( lambda filename: str(Path(out_dir, filename)) ) return attachments_dataframe
Static methods
def build_attachments_dataframe(input_dataframe, join_column, attachment_column, out_dir)
-
Create an attachments dataframe by subsetting down to just the two fields and dropping any rows with null/empty attachments
Args
input_dataframe
:pd.DataFrame
- Input data containing at least the join and attachment filename columns
join_column
:str
- Unique key joining attachments to live data
attachment_column
:str
- Filename for each attachment
out_dir
:str
orPath
- Output directory, will be used to build full path to attachment
Returns
pd.DataFrame
- Dataframe with join key, attachment name, and full attachment paths
Expand source code
@staticmethod def build_attachments_dataframe(input_dataframe, join_column, attachment_column, out_dir): """Create an attachments dataframe by subsetting down to just the two fields and dropping any rows with null/empty attachments Args: input_dataframe (pd.DataFrame): Input data containing at least the join and attachment filename columns join_column (str): Unique key joining attachments to live data attachment_column (str): Filename for each attachment out_dir (str or Path): Output directory, will be used to build full path to attachment Returns: pd.DataFrame: Dataframe with join key, attachment name, and full attachment paths """ input_dataframe[attachment_column].replace("", np.nan, inplace=True) #: pandas doesn't see empty strings as NAs attachments_dataframe = ( input_dataframe[[join_column, attachment_column]].copy().dropna(subset=[attachment_column]) ) #: Create the full path by prepending the output directory using .apply and a lambda function attachments_dataframe["full_file_path"] = attachments_dataframe[attachment_column].apply( lambda filename: str(Path(out_dir, filename)) ) return attachments_dataframe
Methods
def update_attachments(self, feature_layer_itemid, attachment_join_field, attachment_path_field, attachments_df, layer_number=0)
-
Update a feature layer's attachments based on info from a dataframe of desired attachment file names
Depends on a dataframe populated with a join key for the live data and the downloaded or locally-available attachments. If the name of the "new" attachment is the same as an existing attachment for that feature, it is not updated. If it is different or there isn't an existing attachment, the "new" attachment is attached to that feature.
Args
feature_layer_itemid
:str
- The AGOL Item ID of the feature layer to update
attachment_join_field
:str
- The field containing the join key between the attachments dataframe and the live data
attachment_path_field
:str
- The field containing the desired attachment file path
attachments_df
:pd.DataFrame
- A dataframe of desired attachments, including a join key and the local path to the attachment
layer_number
:int
, optional- The layer within the Item ID to update. Defaults to 0.
Returns
(int, int): Tuple of counts of successful overwrites and adds.
Expand source code
def update_attachments( self, feature_layer_itemid, attachment_join_field, attachment_path_field, attachments_df, layer_number=0 ): """Update a feature layer's attachments based on info from a dataframe of desired attachment file names Depends on a dataframe populated with a join key for the live data and the downloaded or locally-available attachments. If the name of the "new" attachment is the same as an existing attachment for that feature, it is not updated. If it is different or there isn't an existing attachment, the "new" attachment is attached to that feature. Args: feature_layer_itemid (str): The AGOL Item ID of the feature layer to update attachment_join_field (str): The field containing the join key between the attachments dataframe and the live data attachment_path_field (str): The field containing the desired attachment file path attachments_df (pd.DataFrame): A dataframe of desired attachments, including a join key and the local path to the attachment layer_number (int, optional): The layer within the Item ID to update. Defaults to 0. Returns: (int, int): Tuple of counts of successful overwrites and adds. """ self._class_logger.info("Updating attachments...") #: These names are present in the live attachment data downloaded from AGOL. Because we merge the dataframes #: later, we need to make sure they're not the same. There may be better ways of handling this that allows the #: client names to be preserved, but for now force them to fix this. self._check_attachment_dataframe_for_invalid_column_names( attachments_df, invalid_names=["OBJECTID", "PARENTOBJECTID", "NAME", "ID"] ) self._class_logger.debug("Using layer %s from item ID %s", layer_number, feature_layer_itemid) self.feature_layer = self.gis.content.get(feature_layer_itemid).layers[layer_number] live_features_as_df = pd.DataFrame.spatial.from_layer(self.feature_layer) live_data_subset_df = self._get_live_oid_and_guid_from_join_field_values( live_features_as_df, attachment_join_field, attachments_df ) #: TODO: Make sure layer supports attachments so we don't get an arcgis error. #: Check out the feature layer .properties and FeatureLayerManager.add_to_definition to check/enable? attachment_eval_df = self._get_current_attachment_info_by_oid(live_data_subset_df) attachment_action_df = self._create_attachment_action_df(attachment_eval_df, attachment_path_field) overwrites_count = self._overwrite_attachments_by_oid(attachment_action_df, attachment_path_field) adds_count = self._add_attachments_by_oid(attachment_action_df, attachment_path_field) self._class_logger.info("%s attachments added, %s attachments overwritten", adds_count, overwrites_count) return overwrites_count, adds_count
class ServiceUpdater (gis: arcgis.gis.GIS, itemid: str, service_type: Literal['layer', 'table'] = 'layer', index: int = 0, working_dir: pathlib.Path | None = None, gdb_item_prefix: str = 'palletjack')
-
Update an AGOL Feature or Table Service with data from a pandas DataFrame or Spatially-enabled Dataframe.
This class represents the feature layer or table that will be updated and stores a reference to the dataset and it's containing gis. It contains four methods for updating the data: add, remove, update, and truncate_and_load.
It is the client's responsibility to separate out the new data into these different steps. If the extract/transform stages result in separate groups of records that need to be added, deleted, and updated, the client must call the three different methods with dataframes containing only the respective records for each operation.
The method used to upload the data to AGOL saves the updated data as a new layer or table named upload in working_dir/upload.gdb, zips the gdb, uploads it to AGOL (with the item name
gdb_item_prefix Temporary gdb upload
), and then uses this as the source data for a call to the feature layer or tables's .append() method. The geodatabase upload.gdb will be created in working_dir if it doesn't already exist. Ideally, working_dir should be a TemporaryDirectory unless persistent access to the gdb is desired.Args
gis
:arcgis.gis.GIS
- The AGOL organization's gis object
itemid
:str
- The AGOL item ID of the feature layer or table to update
- service_type (Literal["layer", "table"], optional): The type of service to update. Defaults to "layer".
index
:int
, optional- The index of the layer or table within the item. Defaults to 0.
working_dir
:Path
, optional- The directory in which to save the gdb for uploading. Defaults to None.
gdb_item_prefix
:str
, optional- The prefix to use for the gdb item name. Defaults to "palletjack".
Expand source code
class ServiceUpdater: """Update an AGOL Feature or Table Service with data from a pandas DataFrame or Spatially-enabled Dataframe. This class represents the feature layer or table that will be updated and stores a reference to the dataset and it's containing gis. It contains four methods for updating the data: add, remove, update, and truncate_and_load. It is the client's responsibility to separate out the new data into these different steps. If the extract/transform stages result in separate groups of records that need to be added, deleted, and updated, the client must call the three different methods with dataframes containing only the respective records for each operation. The method used to upload the data to AGOL saves the updated data as a new layer or table named upload in working_dir/upload.gdb, zips the gdb, uploads it to AGOL (with the item name `gdb_item_prefix Temporary gdb upload`), and then uses this as the source data for a call to the feature layer or tables's .append() method. The geodatabase upload.gdb will be created in working_dir if it doesn't already exist. Ideally, working_dir should be a TemporaryDirectory unless persistent access to the gdb is desired. """ def __init__( self, gis: arcgis.gis.GIS, itemid: str, service_type: Literal["layer", "table"] = "layer", index: int = 0, working_dir: Path | None = None, gdb_item_prefix: str = "palletjack", ) -> None: """ Args: gis (arcgis.gis.GIS): The AGOL organization's gis object itemid (str): The AGOL item ID of the feature layer or table to update service_type (Literal["layer", "table"], optional): The type of service to update. Defaults to "layer". index (int, optional): The index of the layer or table within the item. Defaults to 0. working_dir (Path, optional): The directory in which to save the gdb for uploading. Defaults to None. gdb_item_prefix (str, optional): The prefix to use for the gdb item name. Defaults to "palletjack". """ self._class_logger = logging.getLogger(__name__).getChild(self.__class__.__name__) self.gis = gis if service_type == "table": self.service = arcgis.features.Table.fromitem(gis.content.get(itemid), table_id=index) else: self.service = arcgis.features.FeatureLayer.fromitem(gis.content.get(itemid), layer_id=index) self.service_type = service_type self.index = index self.itemid = itemid self.working_dir = working_dir self.gdb_item_prefix = gdb_item_prefix def add(self, dataframe: pd.DataFrame) -> int: """Adds new features/rows to existing hosted feature layer/table from a new dataframe. If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data. The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error. Args: dataframe (pd.DataFrame): Dataframe of data to be added Raises: ValueError: If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data. Returns: int: Number of features added """ self._class_logger.info( "Adding items to %s index `%s` in itemid `%s` in-place", self.service_type, self.index, self.itemid, ) fields = self.__class__._get_fields_from_dataframe(dataframe) self._class_logger.debug("Using fields %s", fields) #: Field checks to prevent various AGOL errors utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=False) #: Upload append_count = self._upload_data( dataframe, upsert=False, ) return append_count def remove(self, delete_oids: list[int]) -> int: """Deletes features/rows from a hosted feature layer/table based on list of Object IDs This is a wrapper around the arcgis.FeatureLayer/Table.delete_features methods that adds some sanity checking. The delete operation is rolled back if any of the features/rows fail to delete using (rollback_on_failure=True). This function will raise a RuntimeError as well after delete() returns if any of them fail. The sanity checks will raise errors or warnings as appropriate if any of them fail. Args: delete_oids (list[int]): List of OIDs to delete Raises: ValueError: If delete_string can't be split on `,` TypeError: If any of the items in delete_string can't be cast to ints ValueError: If delete_string is empty UserWarning: If any of the Object IDs in delete_string don't exist in the live data RuntimeError: If any of the OIDs fail to delete Returns: int: The number of features deleted """ self._class_logger.info( "Deleting features from %s index `%s` in itemid `%s`", self.service_type, self.index, self.itemid, ) self._class_logger.debug("Delete string: %s", delete_oids) #: Verify delete list oid_numeric = utils.DeleteUtils.check_delete_oids_are_ints(delete_oids) utils.DeleteUtils.check_for_empty_oid_list(oid_numeric, delete_oids) delete_string = ",".join([str(oid) for oid in oid_numeric]) num_missing_oids = utils.DeleteUtils.check_delete_oids_are_in_live_data( delete_string, oid_numeric, self.service, ) #: Note: apparently not all services support rollback: #: https://developers.arcgis.com/rest/services-reference/enterprise/delete-features.htm deletes = utils.retry( self.service.delete_features, deletes=delete_string, rollback_on_failure=True, ) failed_deletes = [result["objectId"] for result in deletes["deleteResults"] if not result["success"]] if failed_deletes: raise RuntimeError(f"The following Object IDs failed to delete: {failed_deletes}") #: The REST API still returns success: True on missing OIDs, so we have to track this ourselves actual_delete_count = len(deletes["deleteResults"]) - num_missing_oids return actual_delete_count def update(self, dataframe: pd.DataFrame, update_geometry: bool = True) -> int: """Updates existing features/rows within a hosted feature layer/table using OBJECTID as the join field. The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error. Uses the OBJECTID field to determine which features should be updated by the underlying FeatureLayer/Table.append() methods. The most robust way to do this is to load the live data as a dataframe, subset it down to the desired rows, make your edits based on a separate join id, and then pass that dataframe to this method. The new data can have either attributes and geometries or only attributes based on the update_geometry flag. A combination of updates from a source with both attributes & geometries and a source with attributes-only must be done with two separate calls. The geometries must be provided in a SHAPE column and be the same type as the live data. Args: dataframe (pd.DataFrame): Dataframe of data to be updated update_geometry (bool): Whether to update attributes and geometry (True) or just attributes (False). Defaults to True for feature layers and is always False for tables. Raises: ValueError: If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data. ValueError: If update_geometry is True for a table Returns: int: Number of features updated """ self._class_logger.info("Updating layer `%s` in itemid `%s` in-place", self.index, self.itemid) fields = self.__class__._get_fields_from_dataframe(dataframe) self._class_logger.debug("Updating fields %s", fields) #: Field checks to prevent various AGOL errors utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=True) #: Upload data count = self._upload_data( dataframe, upsert=True, upsert_matching_field="OBJECTID", append_fields=fields, #: Apparently this works if append_fields is all the fields, but not a subset? update_geometry=update_geometry if self._is_feature_layer() else False, ) return count def truncate_and_load(self, dataframe: pd.DataFrame, save_old: bool = False) -> int: """Overwrite a hosted feature layer or table by truncating and loading the new data. When the existing dataset is truncated, a copy is kept in memory as a dataframe. If save_old is set, this is saved as a layer in self.working_dir/backup.gdb with the layer/table name {name}_{todays_date}.json (foobar_2022-12-31.json). If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data. New OBJECTIDs will be automatically generated. The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error. Args: dataframe (pd.DataFrame): Spatially enabled dataframe of new data to be loaded save_old (bool): Save existing data to backup.gdb in working_dir. Defaults to False Returns: int: Number of features loaded """ self._class_logger.info( "Truncating and loading %s index `%s` in itemid `%s`", self.service_type, self.index, self.itemid, ) start = datetime.now() #: Save the data to disk if desired if save_old: self._class_logger.info("Saving existing data to %s", self.working_dir) saved_layer_path = utils.save_to_gdb(self.service, self.working_dir) fields = self.__class__._get_fields_from_dataframe(dataframe) #: Field checks to prevent various AGOL errors utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=False) self._class_logger.info("Truncating existing data...") self._truncate_existing_data() try: self._class_logger.info("Loading new data...") append_count = self._upload_data(dataframe, upsert=False) self._class_logger.debug("Total truncate and load time: %s", datetime.now() - start) except Exception: if save_old: self._class_logger.error("Append failed. Data saved to %s", saved_layer_path) raise self._class_logger.error("Append failed. Old data not saved (save_old set to False)") raise return append_count @staticmethod def _get_fields_from_dataframe(dataframe: pd.DataFrame) -> list[str]: """Get the fields from a dataframe, excluding Shape_Area and Shape_Length Args: dataframe (pd.DataFrame): Dataframe to get fields from Returns: list[str]: List of the columns of the dataframe, excluding Shape_Area and Shape_Length """ fields = list(dataframe.columns) for auto_gen_field in ["Shape_Area", "Shape_Length"]: try: fields.remove(auto_gen_field) except ValueError: continue return fields def _upload_data(self, dataframe: pd.DataFrame, **append_kwargs) -> int: """Append a dataframe to a feature layer or table by uploading it as a zipped file gdb. We first save the new dataframe as a layer or table in an empty geodatabase, then zip it and upload it to AGOL as a standalone item. We then call append on the target feature layer or table with this item as the source for the append, using upsert where appropriate to update existing data using OBJECTID as the join field. Afterwards, we delete the gdb item and the zipped gdb. Args: dataframe (pd.DataFrame): A dataframe containing data to be added or upserted to the feature layer or table. The fields must match the live fields in name, type, and length (where applicable). For feature layers, the dataframe must have a SHAPE column containing geometries of the same type as the live data. **append_kwargs: Additional keyword arguments to pass to the append operation. Raises: ValueError: If the field used as a key for upsert matching is not present in either the new or live data. RuntimeError: If the append operation fails. Returns: int: The number of records upserted. """ try: if append_kwargs["upsert"] and ( append_kwargs["upsert_matching_field"] not in append_kwargs["append_fields"] or append_kwargs["upsert_matching_field"] not in dataframe.columns ): raise ValueError( f'Upsert matching field {append_kwargs["upsert_matching_field"]} not found in either append fields or existing fields.' ) except KeyError: pass self._class_logger.debug("Saving data to gdb and zipping...") zipped_gdb_path = self._save_to_gdb_and_zip(dataframe) self._class_logger.debug("Uploading gdb to AGOL...") gdb_item = self._upload_gdb(zipped_gdb_path) self._class_logger.debug("Appending data from gdb to target...") try: result, messages = utils.retry( self.service.append, item_id=gdb_item.id, upload_format="filegdb", source_table_name="upload", return_messages=True, rollback=True, **append_kwargs, ) if not result: raise RuntimeError("Append failed but did not error") except Exception as error: raise RuntimeError("Failed to append data from gdb, changes should have been rolled back") from error self._cleanup(gdb_item, zipped_gdb_path) return messages["recordCount"] def _save_to_gdb_and_zip(self, dataframe: pd.DataFrame) -> Path: """Save a dataframe to a gdb feature class or table, zip it, and return path to the zipped file. Requires self.working_dir to be set. Uses pyogrio to save the dataframe to the gdb, then uses shutil.make_archive to zip the gdb. The zipped gdb is saved in self.working_dir. Args: dataframe (pd.DataFrame): The input dataframe to be saved. Raises: ValueError: If self.working_dir is not set or the empty upload.gdb doesn't exist in it. Returns: Path: The path to the zipped GDB. """ try: gdb_path = Path(self.working_dir) / "upload.gdb" except TypeError as error: raise AttributeError(f"working_dir not specified on {self.__class__.__name__}") from error try: #: check if the dataframe is a spatially enabled dataframe dataframe.spatial.geometry_type # raises KeyError if this is a regular dataframe gdf = utils.sedf_to_gdf(dataframe) except KeyError: gdf = gpd.GeoDataFrame(dataframe) try: gdf.to_file(gdb_path, layer="upload", engine="pyogrio", driver="OpenFileGDB") except pyogrio.errors.DataSourceError as error: raise ValueError( f"Error writing layer to {gdb_path}. Verify {self.working_dir} exists and is writable." ) from error try: zipped_gdb_path = shutil.make_archive(gdb_path, "zip", root_dir=gdb_path.parent, base_dir=gdb_path.name) except OSError as error: raise ValueError(f"Error zipping {gdb_path}") from error return zipped_gdb_path def _upload_gdb(self, zipped_gdb_path: Path) -> arcgis.gis.Item: """Add a zipped gdb to AGOL as an item to self.gis Args: zipped_gdb_path (Path): Path to the zipped gdb Raises: RuntimeError: If there is an error uploading the gdb to AGOL Returns: arcgis.gis.Item: Reference to the resulting Item object in self.gis """ try: gdb_item = utils.retry( self.gis.content.add, item_properties={ "type": "File Geodatabase", "title": f"{self.gdb_item_prefix} Temporary gdb upload", "snippet": "Temporary gdb upload from palletjack", }, data=zipped_gdb_path, ) except Exception as error: raise RuntimeError(f"Error uploading {zipped_gdb_path} to AGOL") from error return gdb_item def _cleanup(self, gdb_item: arcgis.gis.Item, zipped_gdb_path: Path) -> None: """Remove the zipped gdb from disk and the gdb item from AGOL Args: gdb_item (arcgis.gis.Item): Reference to the gdb item in self.gis zipped_gdb_path (Path): Path to the gdb on disk Raises: RuntimeError: If there are errors deleting the gdb item or the zipped gdb """ try: gdb_item.delete() except Exception as error: warnings.warn(f"Error deleting gdb item {gdb_item.id} from AGOL") warnings.warn(repr(error)) try: zipped_gdb_path.unlink() except Exception as error: warnings.warn(f"Error deleting zipped gdb {zipped_gdb_path}") warnings.warn(repr(error)) def _truncate_existing_data(self) -> None: """Remove all existing features from the live dataset Raises: RuntimeError: If the truncate fails """ self._class_logger.debug("Truncating...") truncate_result = utils.retry( self.service.manager.truncate, asynchronous=True, wait=True, ) self._class_logger.debug(truncate_result) if truncate_result["status"] != "Completed": raise RuntimeError(f"Failed to truncate existing data in itemid {self.itemid}") def _is_feature_layer(self) -> bool: """Help function to determine if we are dealing with a feature layer as opposed to a table Returns: bool: True if the service is a feature layer, False if it is a table """ return self.service_type == "layer"
Methods
def add(self, dataframe: pandas.core.frame.DataFrame) ‑> int
-
Adds new features/rows to existing hosted feature layer/table from a new dataframe.
If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data.
The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error.
Args
dataframe
:pd.DataFrame
- Dataframe of data to be added
Raises
ValueError
- If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data.
Returns
int
- Number of features added
Expand source code
def add(self, dataframe: pd.DataFrame) -> int: """Adds new features/rows to existing hosted feature layer/table from a new dataframe. If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data. The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error. Args: dataframe (pd.DataFrame): Dataframe of data to be added Raises: ValueError: If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data. Returns: int: Number of features added """ self._class_logger.info( "Adding items to %s index `%s` in itemid `%s` in-place", self.service_type, self.index, self.itemid, ) fields = self.__class__._get_fields_from_dataframe(dataframe) self._class_logger.debug("Using fields %s", fields) #: Field checks to prevent various AGOL errors utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=False) #: Upload append_count = self._upload_data( dataframe, upsert=False, ) return append_count
def remove(self, delete_oids: list[int]) ‑> int
-
Deletes features/rows from a hosted feature layer/table based on list of Object IDs
This is a wrapper around the arcgis.FeatureLayer/Table.delete_features methods that adds some sanity checking. The delete operation is rolled back if any of the features/rows fail to delete using (rollback_on_failure=True). This function will raise a RuntimeError as well after delete() returns if any of them fail.
The sanity checks will raise errors or warnings as appropriate if any of them fail.
Args
delete_oids
:list[int]
- List of OIDs to delete
Raises
ValueError
- If delete_string can't be split on
,
TypeError
- If any of the items in delete_string can't be cast to ints
ValueError
- If delete_string is empty
UserWarning
- If any of the Object IDs in delete_string don't exist in the live data
RuntimeError
- If any of the OIDs fail to delete
Returns
int
- The number of features deleted
Expand source code
def remove(self, delete_oids: list[int]) -> int: """Deletes features/rows from a hosted feature layer/table based on list of Object IDs This is a wrapper around the arcgis.FeatureLayer/Table.delete_features methods that adds some sanity checking. The delete operation is rolled back if any of the features/rows fail to delete using (rollback_on_failure=True). This function will raise a RuntimeError as well after delete() returns if any of them fail. The sanity checks will raise errors or warnings as appropriate if any of them fail. Args: delete_oids (list[int]): List of OIDs to delete Raises: ValueError: If delete_string can't be split on `,` TypeError: If any of the items in delete_string can't be cast to ints ValueError: If delete_string is empty UserWarning: If any of the Object IDs in delete_string don't exist in the live data RuntimeError: If any of the OIDs fail to delete Returns: int: The number of features deleted """ self._class_logger.info( "Deleting features from %s index `%s` in itemid `%s`", self.service_type, self.index, self.itemid, ) self._class_logger.debug("Delete string: %s", delete_oids) #: Verify delete list oid_numeric = utils.DeleteUtils.check_delete_oids_are_ints(delete_oids) utils.DeleteUtils.check_for_empty_oid_list(oid_numeric, delete_oids) delete_string = ",".join([str(oid) for oid in oid_numeric]) num_missing_oids = utils.DeleteUtils.check_delete_oids_are_in_live_data( delete_string, oid_numeric, self.service, ) #: Note: apparently not all services support rollback: #: https://developers.arcgis.com/rest/services-reference/enterprise/delete-features.htm deletes = utils.retry( self.service.delete_features, deletes=delete_string, rollback_on_failure=True, ) failed_deletes = [result["objectId"] for result in deletes["deleteResults"] if not result["success"]] if failed_deletes: raise RuntimeError(f"The following Object IDs failed to delete: {failed_deletes}") #: The REST API still returns success: True on missing OIDs, so we have to track this ourselves actual_delete_count = len(deletes["deleteResults"]) - num_missing_oids return actual_delete_count
def truncate_and_load(self, dataframe: pandas.core.frame.DataFrame, save_old: bool = False) ‑> int
-
Overwrite a hosted feature layer or table by truncating and loading the new data.
When the existing dataset is truncated, a copy is kept in memory as a dataframe. If save_old is set, this is saved as a layer in self.working_dir/backup.gdb with the layer/table name {name}_{todays_date}.json (foobar_2022-12-31.json).
If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data. New OBJECTIDs will be automatically generated.
The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error.
Args
dataframe
:pd.DataFrame
- Spatially enabled dataframe of new data to be loaded
save_old
:bool
- Save existing data to backup.gdb in working_dir. Defaults to False
Returns
int
- Number of features loaded
Expand source code
def truncate_and_load(self, dataframe: pd.DataFrame, save_old: bool = False) -> int: """Overwrite a hosted feature layer or table by truncating and loading the new data. When the existing dataset is truncated, a copy is kept in memory as a dataframe. If save_old is set, this is saved as a layer in self.working_dir/backup.gdb with the layer/table name {name}_{todays_date}.json (foobar_2022-12-31.json). If you are working with a feature layer, the new dataframe must have a 'SHAPE' column containing geometries of the same type as the live data. New OBJECTIDs will be automatically generated. The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error. Args: dataframe (pd.DataFrame): Spatially enabled dataframe of new data to be loaded save_old (bool): Save existing data to backup.gdb in working_dir. Defaults to False Returns: int: Number of features loaded """ self._class_logger.info( "Truncating and loading %s index `%s` in itemid `%s`", self.service_type, self.index, self.itemid, ) start = datetime.now() #: Save the data to disk if desired if save_old: self._class_logger.info("Saving existing data to %s", self.working_dir) saved_layer_path = utils.save_to_gdb(self.service, self.working_dir) fields = self.__class__._get_fields_from_dataframe(dataframe) #: Field checks to prevent various AGOL errors utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=False) self._class_logger.info("Truncating existing data...") self._truncate_existing_data() try: self._class_logger.info("Loading new data...") append_count = self._upload_data(dataframe, upsert=False) self._class_logger.debug("Total truncate and load time: %s", datetime.now() - start) except Exception: if save_old: self._class_logger.error("Append failed. Data saved to %s", saved_layer_path) raise self._class_logger.error("Append failed. Old data not saved (save_old set to False)") raise return append_count
def update(self, dataframe: pandas.core.frame.DataFrame, update_geometry: bool = True) ‑> int
-
Updates existing features/rows within a hosted feature layer/table using OBJECTID as the join field.
The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error.
Uses the OBJECTID field to determine which features should be updated by the underlying FeatureLayer/Table.append() methods. The most robust way to do this is to load the live data as a dataframe, subset it down to the desired rows, make your edits based on a separate join id, and then pass that dataframe to this method.
The new data can have either attributes and geometries or only attributes based on the update_geometry flag. A combination of updates from a source with both attributes & geometries and a source with attributes-only must be done with two separate calls. The geometries must be provided in a SHAPE column and be the same type as the live data.
Args
dataframe
:pd.DataFrame
- Dataframe of data to be updated
update_geometry
:bool
- Whether to update attributes and geometry (True) or just attributes (False). Defaults to True for feature layers and is always False for tables.
Raises
ValueError
- If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data.
ValueError
- If update_geometry is True for a table
Returns
int
- Number of features updated
Expand source code
def update(self, dataframe: pd.DataFrame, update_geometry: bool = True) -> int: """Updates existing features/rows within a hosted feature layer/table using OBJECTID as the join field. The new dataframe's columns and data must match the existing data's fields (with the exception of generated fields like shape area and length) in name, type, and allowable length. Live fields that are not nullable and don't have a default value must have a value in the new data; missing data in these fields will raise an error. Uses the OBJECTID field to determine which features should be updated by the underlying FeatureLayer/Table.append() methods. The most robust way to do this is to load the live data as a dataframe, subset it down to the desired rows, make your edits based on a separate join id, and then pass that dataframe to this method. The new data can have either attributes and geometries or only attributes based on the update_geometry flag. A combination of updates from a source with both attributes & geometries and a source with attributes-only must be done with two separate calls. The geometries must be provided in a SHAPE column and be the same type as the live data. Args: dataframe (pd.DataFrame): Dataframe of data to be updated update_geometry (bool): Whether to update attributes and geometry (True) or just attributes (False). Defaults to True for feature layers and is always False for tables. Raises: ValueError: If the new field and existing fields don't match, the SHAPE field is missing or has an incompatible type, the new data contains null fields, the new data exceeds the existing field lengths, or a specified field is missing from either new or live data. ValueError: If update_geometry is True for a table Returns: int: Number of features updated """ self._class_logger.info("Updating layer `%s` in itemid `%s` in-place", self.index, self.itemid) fields = self.__class__._get_fields_from_dataframe(dataframe) self._class_logger.debug("Updating fields %s", fields) #: Field checks to prevent various AGOL errors utils.FieldChecker.check_fields(self.service.properties, dataframe, fields, add_oid=True) #: Upload data count = self._upload_data( dataframe, upsert=True, upsert_matching_field="OBJECTID", append_fields=fields, #: Apparently this works if append_fields is all the fields, but not a subset? update_geometry=update_geometry if self._is_feature_layer() else False, ) return count