Source code for picterra.forge_client

"""
Handles interfacing with the API documented at https://app.picterra.ch/public/apidocs/v2/

Note that Forge is separate from Tracer and so an API key which is valid for
one may encounter permissions issues if used with the other
"""

from __future__ import annotations

import json
import logging
import sys
import tempfile
import warnings

if sys.version_info >= (3, 8):
    from typing import Literal
else:
    from typing_extensions import Literal

from typing import Any

import requests
from typing_extensions import TypeAlias

from picterra.base_client import (
    APIError,
    BaseAPIClient,
    Feature,
    FeatureCollection,
    _download_to_file,
    _upload_file_to_blobstore,
)

logger = logging.getLogger()


GrantKind: TypeAlias = Literal["folder", "detector"]


[docs] class ForgeClient(BaseAPIClient): def __init__(self, **kwargs): super().__init__("public/api/v2/", **kwargs)
[docs] def get_user_info(self) -> dict: """ Get information about the current user This endpoint is in alpha stage and may change without warning. """ resp = self.sess.get(self._full_url("users/me/")) if not resp.ok: raise APIError(resp.text) return resp.json()
[docs] def upload_raster( self, filename: str, name: str, folder_id: str | None = None, captured_at: str | None = None, identity_key: str | None = None, multispectral: bool = True, cloud_coverage: int | None = None, user_tag: str | None = None, ) -> str: """ Upload a raster to picterra. Args: filename: Local filename of raster to upload name: A human-readable name for this raster folder_id: Id of the folder this raster belongs to; if not provided, the raster will be put in the "Picterra API Project" folder captured_at: ISO-8601 date and time at which this raster was captured, YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]; e.g. "2020-01-01T12:34:56.789Z" identity_key: Personal identifier for this raster. multispectral: If True, the raster is in multispectral mode and can have an associated band specification cloud_coverage: Raster cloud coverage %. user_tag (beta): Raster tag Returns: str: The id of the uploaded raster """ data: dict[str, Any] = {"name": name, "multispectral": multispectral} if folder_id is not None: data.update({"folder_id": folder_id}) if captured_at is not None: data.update({"captured_at": captured_at}) if identity_key is not None: data.update({"identity_key": identity_key}) if cloud_coverage is not None: data.update({"cloud_coverage": cloud_coverage}) if user_tag is not None: data.update({"user_tag": user_tag}) resp = self.sess.post(self._full_url("rasters/upload/file/"), json=data) if not resp.ok: raise APIError(resp.text) data = resp.json() upload_url = str(data["upload_url"]) raster_id: str = data["raster_id"] _upload_file_to_blobstore(upload_url, filename) resp = self.sess.post(self._full_url("rasters/%s/commit/" % raster_id)) if not resp.ok: raise APIError(resp.text) self._wait_until_operation_completes(resp.json()) return raster_id
[docs] def list_folder_detectors(self, folder_id: str, page_number: int | None = None): """ List of detectors assigned to a given folder, see `ResultsPage` for the pagination access pattern. This a **beta** function, subject to change. Args: folder_id: The id of the folder to obtain the detectors for page_number: Optional page (from 1) of the list we want to retrieve Returns: ResultsPage: a ResultsPage object that contains a slice of the list of detector dictionaries, plus methods to retrieve the other pages Example: :: { "id": "id1", "name": "detector1", "is_runnable": True, "user_tag": "tag1", }, { "id": "id2", "name": "detector2", "is_runnable": False, "user_tag": "tag2", } """ return self._return_results_page( "folders/%s/detectors" % folder_id, {"page_number": page_number} if page_number is not None else None, )
[docs] def list_rasters( self, folder_id: str | None = None, search_string: str | None = None, user_tag: str | None = None, max_cloud_coverage: int | None = None, captured_before: str | None = None, captured_after: str | None = None, has_vector_layers: bool | None = None, page_number: int | None = None, ): """ List of rasters metadata, see `ResultsPage` for the pagination access pattern. Args: folder_id: The id of the folder to search rasters in search_string: The search term used to filter rasters by name user_tag: [beta] The user tag to filter rasters by max_cloud_coverage: [beta] The max_cloud_coverage of the rasters (between 0 and 100) captured_before: ISO 8601 -formatted date / time of capture we want to list the rasters since captured_after: ISO 8601 -formatted date / time of capture we want to list the rasters from has_vector_layers: [beta] Whether or not the rasters have at least one vector layer page_number: Optional page (from 1) of the list we want to retrieve Returns: ResultsPage: a ResultsPage object that contains a slice of the list of raster dictionaries Example: :: { 'id': '42', 'status': 'ready', 'name': 'raster1', 'folder_id': 'abc' }, { 'id': '43', 'status': 'ready', 'name': 'raster2', 'folder_id': 'def' } """ params: dict[str, Any] = {} if folder_id: params["folder"] = folder_id if search_string: params["search"] = search_string if user_tag is not None: params["user_tag"] = user_tag.strip() if max_cloud_coverage is not None: params["max_cloud_coverage"] = max_cloud_coverage if captured_before is not None: params["captured_before"] = captured_before if captured_after is not None: params["captured_after"] = captured_after if has_vector_layers is not None: params["has_vector_layers"] = bool(has_vector_layers) if page_number is not None: params["page_number"] = page_number return self._return_results_page("rasters", params)
[docs] def get_raster(self, raster_id: str) -> dict[str, Any]: """ Get raster information Args: raster_id: id of the raster Raises: APIError: There was an error while getting the raster information Returns: dict: Dictionary of the information """ resp = self.sess.get(self._full_url("rasters/%s/" % raster_id)) if not resp.ok: raise APIError(resp.text) return resp.json()
[docs] def edit_raster( self, raster_id: str, name: str | None = None, folder_id: str | None = None, captured_at: str | None = None, identity_key: str | None = None, multispectral_band_specification: dict | None = None, cloud_coverage: int | None = None, user_tag: str | None = None, ): """ Edits an already existing raster. Args: name: New human-readable name for this raster folder_id: Id of the new folder for this raster (move is in another project) captured_at: new ISO-8601 date and time at which this raster was captured, YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]; e.g. "2020-01-01T12:34:56.789Z" identity_key: New personal identifier for this raster. multispectral_band_specification: The new band specification, see https://docs.picterra.ch/advanced-topics/multispectral cloud_coverage: Raster cloud coverage new percentage user_tag (beta): Raster tag Returns: str: The id of the edited raster """ data: dict[str, Any] = {} if name: data.update({"name": name}) if folder_id is not None: data.update({"folder_id": folder_id}) if captured_at is not None: data.update({"captured_at": captured_at}) if identity_key is not None: data.update({"identity_key": identity_key}) if multispectral_band_specification is not None: data.update( {"multispectral_band_specification": multispectral_band_specification} ) if cloud_coverage is not None: data.update({"cloud_coverage": cloud_coverage}) if user_tag: data.update({"user_tag": user_tag}) resp = self.sess.put(self._full_url("rasters/%s/" % raster_id), json=data) if not resp.ok: raise APIError(resp.text) return raster_id
[docs] def delete_raster(self, raster_id: str): """ Deletes a given raster by its identifier Args: raster_id: The id of the raster to delete Raises: APIError: There was an error while trying to delete the raster """ resp = self.sess.delete(self._full_url("rasters/%s/" % raster_id)) if not resp.ok: raise APIError(resp.text)
[docs] def download_raster_to_file(self, raster_id: str, filename: str): """ Downloads a raster to a local file Args: raster_id: The id of the raster to download filename: The local filename where to save the raster image Raises: APIError: There was an error while trying to download the raster """ resp = self.sess.get(self._full_url("rasters/%s/download/" % raster_id)) if not resp.ok: raise APIError(resp.text) raster_url = resp.json()["download_url"] logger.debug("Trying to download raster %s from %s.." % (raster_id, raster_url)) _download_to_file(raster_url, filename)
[docs] def set_raster_detection_areas_from_file(self, raster_id: str, filename: str): """ This is an experimental feature Set detection areas from a GeoJSON file Args: raster_id: The id of the raster to which to assign the detection areas filename: The filename of a GeoJSON file. This should contain a FeatureCollection of Polygon/MultiPolygon Raises: APIError: There was an error uploading the file to cloud storage """ # Get upload URL resp = self.sess.post( self._full_url("rasters/%s/detection_areas/upload/file/" % raster_id) ) if not resp.ok: raise APIError(resp.text) data = resp.json() upload_url = data["upload_url"] upload_id = data["upload_id"] # Upload to blobstore _upload_file_to_blobstore(upload_url, filename) # Commit upload resp = self.sess.post( self._full_url( "rasters/%s/detection_areas/upload/%s/commit/" % (raster_id, upload_id) ) ) if not resp.ok: raise APIError(resp.text) self._wait_until_operation_completes(resp.json())
[docs] def remove_raster_detection_areas(self, raster_id: str): """ This is an experimental feature Remove the detection areas of a raster Args: raster_id: The id of the raster whose detection areas will be removed Raises: APIError: There was an error during the operation """ resp = self.sess.delete( self._full_url("rasters/%s/detection_areas/" % raster_id) ) if not resp.ok: raise APIError(resp.text)
[docs] def add_raster_to_detector(self, raster_id: str, detector_id: str): """ Associate a raster to a detector This a **beta** function, subject to change. Args: detector_id: The id of the detector raster_id: The id of the raster Raises: APIError: There was an error uploading the file to cloud storage """ resp = self.sess.post( self._full_url("detectors/%s/training_rasters/" % detector_id), json={"raster_id": raster_id}, ) if not resp.status_code == 201: raise APIError(resp.text)
[docs] def create_detector( self, name: str | None = None, detection_type: str = "count", output_type: str = "polygon", training_steps: int = 500, backbone: str = "resnet34", tile_size: int = 256, background_sample_ratio: float = 0.25, ) -> str: """ Creates a new detector This a **beta** function, subject to change. Please note that depending on your plan some setting cannot be different from the default ones Args: name: Name of the detector detection_type: Type of the detector (one of 'count', 'segmentation') output_type: Output type of the detector (one of 'polygon', 'bbox') training_steps: Training steps the detector (integer between 500 & 40000) backbone: detector backbone (one of 'resnet18', 'resnet34', 'resnet50') tile_size: tile size (see HTTP API docs for the allowed values) background_sample_ratio: bg sample ratio (between 0 and 1) Returns: str: The id of the detector Raises: APIError: There was an error while creating the detector """ # Build request body body_data: dict[str, Any] = {"configuration": {}} if name: body_data["name"] = name for i in ( "detection_type", "output_type", "training_steps", "backbone", "tile_size", "background_sample_ratio", ): body_data["configuration"][i] = locals()[i] # Call API and check response resp = self.sess.post(self._full_url("detectors/"), json=body_data) if not resp.status_code == 201: raise APIError(resp.text) return resp.json()["id"]
def get_detector(self, detector_id: str): resp = self.sess.get(self._full_url("detectors/%s/" % detector_id)) if not resp.status_code == 200: raise APIError(resp.text) return resp.json()
[docs] def list_detectors( self, search_string: str | None = None, user_tag: str | None = None, is_shared: bool | None = None, page_number: int | None = None, ): """ List all the detectors the user can access, see `ResultsPage` for the pagination access pattern. Args: search_string: The term used to filter detectors by name user_tag: [beta] User tag to filter detectors by is_shared: [beta] Share status to filter detectors by page_number: Optional page (from 1) of the list we want to retrieve Returns: ResultsPage: A ResultsPage object that contains a slice of the list of detector dictionaries Example: :: { 'id': '42', 'name': 'cow detector', 'configuration': { 'detection_type': 'count', 'output_type': 'bbox', 'training_steps': 787 } }, { 'id': '43', 'name': 'test5', 'configuration': { 'detection_type': 'segmentation', 'output_type': 'polygon', 'training_steps': 500 } } """ data: dict[str, Any] = {} if search_string is not None: data["search"] = search_string.strip() if user_tag is not None: data["user_tag"] = user_tag.strip() if is_shared is not None: data["is_shared"] = is_shared if page_number is not None: data["page_number"] = page_number return self._return_results_page("detectors", data)
[docs] def edit_detector( self, detector_id: str, name: str | None = None, detection_type: str | None = None, output_type: str | None = None, training_steps: int | None = None, backbone: str | None = None, tile_size: int | None = None, background_sample_ratio: float | None = None, ): """ Edit a detector This a **beta** function, subject to change. Please note that depending on your plan some settings may not be editable. Args: detector_id: identifier of the detector name: Name of the detector detection_type: The type of the detector (one of 'count', 'segmentation') output_type: The output type of the detector (one of 'polygon', 'bbox') training_steps: The training steps the detector (int in [500, 40000]) backbone: detector backbone (one of 'resnet18', 'resnet34', 'resnet50') tile_size: tile size (see HTTP API docs for the allowed values) background_sample_ratio: bg sample ratio (between 0 and 1) Raises: APIError: There was an error while editing the detector """ # Build request body body_data: dict[str, Any] = {"configuration": {}} if name: body_data["name"] = name for i in ( "detection_type", "output_type", "training_steps", "backbone", "tile_size", "background_sample_ratio", ): if locals()[i]: body_data["configuration"][i] = locals()[i] # Call API and check response resp = self.sess.put( self._full_url("detectors/%s/" % detector_id), json=body_data ) if not resp.status_code == 204: raise APIError(resp.text)
[docs] def delete_detector(self, detector_id: str): """ Deletes a given detector by its identifier Args: detector_id: The id of the detector to delete Raises: APIError: There was an error while trying to delete the detector """ resp = self.sess.delete(self._full_url("detectors/%s/" % detector_id)) if not resp.ok: raise APIError(resp.text)
[docs] def run_detector( self, detector_id: str, raster_id: str, secondary_raster_id: str | None = None ) -> str: """ Runs a detector on a raster: predictions are subject to a minimum charge of 10 MP. Args: detector_id: The id of the detector raster_id: The id of the raster secondary_raster_id: The id of the secondary raster. This needs to be provided to run change detectors. Returns: str: The id of the operation. You typically want to pass this to `download_result_to_feature_collection` """ body = {"raster_id": raster_id} if secondary_raster_id is not None: body["secondary_raster_id"] = secondary_raster_id resp = self.sess.post( self._full_url("detectors/%s/run/" % detector_id), json=body, ) if not resp.ok: raise APIError(resp.text) operation_response = resp.json() self._wait_until_operation_completes(operation_response) return operation_response["operation_id"]
[docs] def download_result_to_feature_collection(self, operation_id: str, filename: str): """ Downloads the results from a detection operation to a local GeoJSON file. Results are stored as a FeatureCollection of Multipolygon. Each feature has a 'class_name' property indicating the corresponding class name Args: operation_id: The id of the operation to download. This should be a detect operation filename: The local filename where to save the results """ results = self.get_operation_results(operation_id) # We download results to a temporary directory and then assemble them into a # FeatureCollection fc: FeatureCollection = {"type": "FeatureCollection", "features": []} for class_result in results["by_class"]: with tempfile.NamedTemporaryFile() as f: self.download_vector_layer_to_file( class_result["result"]["vector_layer_id"], f.name ) with open(f.name) as fr: vl_polygon_fc: FeatureCollection = json.load(fr) mp_feature: Feature = { "type": "Feature", "properties": {"class_name": class_result["class"]["name"]}, "geometry": {"type": "MultiPolygon", "coordinates": []}, } for poly_feat in vl_polygon_fc["features"]: mp_feature["geometry"]["coordinates"].append( poly_feat["geometry"]["coordinates"] ) fc["features"].append(mp_feature) with open(filename, "w") as f: json.dump(fc, f)
[docs] def download_result_to_file(self, operation_id: str, filename: str): """ Downloads a set of results to a local GeoJSON file .. deprecated:: 1.0.0 Use `download_result_to_feature_collection` instead Args: operation_id: The id of the operation to download filename: The local filename where to save the results """ warnings.warn( "This function is deprecated. Use download_result_to_feature_collection instead", DeprecationWarning, ) result_url = self.get_operation_results(operation_id)["url"] logger.debug("Trying to download result %s.." % result_url) _download_to_file(result_url, filename)
[docs] def set_annotations( self, detector_id: str, raster_id: str, annotation_type: Literal[ "outline", "training_area", "testing_area", "validation_area" ], annotations: dict[str, Any], class_id: str | None = None, ): """ Replaces the annotations of type 'annotation_type' with 'annotations', for the given raster-detector pair. Args: detector_id: The id of the detector raster_id: The id of the raster annotation_type: One of (outline, training_area, testing_area, validation_area) annotations: GeoJSON representation of the features to upload class_id: The class id to which to associate the new annotations. Only valid if annotation_type is "outline" """ # Get an upload url create_upload_resp = self.sess.post( self._full_url( "detectors/%s/training_rasters/%s/%s/upload/bulk/" % (detector_id, raster_id, annotation_type) ) ) if not create_upload_resp.ok: raise APIError(create_upload_resp.text) upload = create_upload_resp.json() upload_url = upload["upload_url"] upload_id = upload["upload_id"] # Given we do not use self.sess the timeout is disabled (requests default), and this # is good as file upload can take a long time upload_resp = requests.put(upload_url, json=annotations) if not upload_resp.ok: logger.error( "Error when sending annotation upload %s to blobstore at url %s" % (upload_id, upload_url) ) raise APIError(upload_resp.text) # Commit upload body = {} if class_id is not None: body["class_id"] = class_id commit_upload_resp = self.sess.post( self._full_url( "detectors/%s/training_rasters/%s/%s/upload/bulk/%s/commit/" % (detector_id, raster_id, annotation_type, upload_id) ), json=body, ) if not commit_upload_resp.ok: raise APIError(commit_upload_resp.text) # Poll for operation completion self._wait_until_operation_completes(commit_upload_resp.json())
[docs] def train_detector(self, detector_id: str): """ Start the training of a detector Args: detector_id: The id of the detector """ resp = self.sess.post(self._full_url("detectors/%s/train/" % detector_id)) if not resp.ok: raise APIError(resp.text) return self._wait_until_operation_completes(resp.json())
[docs] def run_dataset_recommendation(self, detector_id: str): """ This is an **experimental** feature Runs dataset recommendation on a detector. Note that you currently have to use the UI to be able to view the recommendation markers/report. Args: detector_id: The id of the detector """ resp = self.sess.post( self._full_url("detectors/%s/dataset_recommendation/" % detector_id) ) if not resp.ok: raise APIError(resp.text) return self._wait_until_operation_completes(resp.json())
[docs] def upload_vector_layer( self, raster_id: str, filename: str, name: str | None = None, color: str | None = None, ) -> str: """ Uploads a vector layer from a GeoJSON file This a **beta** function, subject to change. Args: raster_id: The id of the raster we want to attach the vector layer to filename: Path to the local GeoJSOn file we want to upload name: Optional name to give to the vector layer color: Optional color of the vector layer, has an HTML hex color code (eg "#aabbcc") Returns: str: the vector layer unique identifier """ resp = self.sess.post(self._full_url("vector_layers/%s/upload/" % raster_id)) if not resp.ok: raise APIError(resp.text) upload = resp.json() upload_id, upload_url = upload["upload_id"], upload["upload_url"] _upload_file_to_blobstore(upload_url, filename) data = {} if name is not None: data["name"] = name if color is not None: data["color"] = color resp = self.sess.post( self._full_url( "vector_layers/%s/upload/%s/commit/" % (raster_id, upload_id) ), json=data, ) if not resp.ok: raise APIError(resp.text) op = self._wait_until_operation_completes(resp.json()) return op["results"]["vector_layer_id"]
[docs] def edit_vector_layer( self, vector_layer_id: str, name: str | None = None, color: str | None = None ): """ Edits a vector layer This a **beta** function, subject to change. Args: vector_layer_id: The id of the vector layer to remove name: new name color: new color """ data = {} if name: data.update({"name": name}) if color is not None: data.update({"color": color}) resp = self.sess.put( self._full_url("vector_layers/%s/" % vector_layer_id), json=data ) if not resp.ok: raise APIError(resp.text)
[docs] def delete_vector_layer(self, vector_layer_id: str): """ Removes a vector layer This a **beta** function, subject to change. Args: vector_layer_id: The id of the vector layer to remove """ resp = self.sess.delete(self._full_url("vector_layers/%s/" % vector_layer_id)) if not resp.ok: raise APIError(resp.text)
[docs] def download_vector_layer_to_file(self, vector_layer_id: str, filename: str): """ Downloads a vector layer This a **beta** function, subject to change. Args: vector_layer_id: The id of the vector layer to download filename: existing file to save the vector layer in, as a feature collection of polygons """ resp = self.sess.post( self._full_url("vector_layers/%s/download/" % vector_layer_id) ) if not resp.ok: raise APIError(resp.text) op = self._wait_until_operation_completes(resp.json()) _download_to_file(op["results"]["download_url"], filename)
[docs] def list_raster_markers( self, raster_id: str, page_number: int | None = None, ): """ This a **beta** function, subject to change. List all the markers on a raster, see `ResultsPage` for the pagination access pattern. Args: raster_id: The id of the raster page_number: Optional page (from 1) of the list we want to retrieve """ return self._return_results_page( "rasters/%s/markers/" % raster_id, {"page_number": page_number} if page_number is not None else None, )
[docs] def create_marker( self, raster_id: str, detector_id: str | None, lng: float, lat: float, text: str, ) -> dict[str, Any]: """ This is an **experimental** (beta) feature Creates a marker Args: raster_id: The id of the raster (belonging to detector) to create the marker on detector_id: The id of the detector to create the marker on. If this is None, the marker is created associated with the raster only Raises: APIError: There was an error while creating the marker """ if detector_id is None: url = "rasters/%s/markers/" % raster_id else: url = "detectors/%s/training_rasters/%s/markers/" % (detector_id, raster_id) data = { "marker": {"type": "Point", "coordinates": [lng, lat]}, "text": text, } resp = self.sess.post(self._full_url(url), json=data) if not resp.ok: raise APIError(resp.text) return resp.json()
[docs] def import_raster_from_remote_source( self, raster_name: str, folder_id: str, source_id: str, aoi_filename: str, method: Literal["streaming"] = "streaming", ) -> str: """ Import a raster from a remote imagery source given a GeoJSON file for the AOI Args: raster_name: Name of the new raster folder_id: The id of the folder / project the raster will live in source_id: The id of the remote imagery source to import from filename: The filename of a GeoJSON file. This should contain a FeatureCollection of Polygon/MultiPolygon representing the AOI of the new raster Raises: APIError: There was an error during import """ # Get upload URL resp = self.sess.post(self._full_url("rasters/import/")) if not resp.ok: raise APIError(resp.text) data = resp.json() upload_url = data["upload_url"] upload_id = data["upload_id"] # Upload to blobstore _upload_file_to_blobstore(upload_url, aoi_filename) # Commit upload resp = self.sess.post( self._full_url(f"rasters/import/{upload_id}/commit/"), json={ "method": method, "source_id": source_id, "folder_id": folder_id, "name": raster_name, }, ) if not resp.ok: raise APIError(resp.text) # Poll operation and get raster identifier operation = self._wait_until_operation_completes(resp.json()) return operation["metadata"]["raster_id"]
[docs] def list_raster_vector_layers( self, raster_id: str, search: str | None = None, detector_id: str | None = None, page_number: int | None = None, ): """ This a **beta** function, subject to change. List all the vector layers on a raster, see `ResultsPage` for the pagination access pattern. Args: raster_id: The id of the raster search: Optional string to search layers by name page_number: Optional page (from 1) of the list we want to retrieve """ params: dict[str, str | int] = {} if search is not None: params["search"] = search if detector_id is not None: params["detector"] = detector_id if page_number is not None: params["page_number"] = page_number url = "rasters/%s/vector_layers/" % raster_id return self._return_results_page(url, params)
[docs] def list_detector_rasters( self, detector_id: str, page_number: int | None = None, ): """ This a **beta** function, subject to change. List rasters of a detector, see `ResultsPage` for the pagination access pattern. Args: detector_id: The id of the detector page_number: Optional page (from 1) of the list we want to retrieve """ params: dict[str, int] = {} if page_number is not None: params["page_number"] = page_number url = "detectors/%s/training_rasters/" % detector_id return self._return_results_page(url, params)
[docs] def create_folder(self, name: str) -> str: """ Creates a new folder with the given name Args: name: Name of the new folder Returns: str: The id of the folder Raises: APIError: There was an error while creating the folder """ resp = self.sess.post(self._full_url("folders/"), json={"name": name}) if not resp.status_code == 201: raise APIError(resp.text) return resp.json()["id"]
[docs] def get_authorization_grants(self, kind: GrantKind, resource_id: str): """ **beta** function. Get the authorization grants for a given resource. Args: kind: The kind of resource to get the grants for resource_id: The ID of the resource Returns: dict: A dictionary containing the authorization grants for the resource. See https://app.picterra.ch/public/apidocs/v2#tag/authorization/operation/getGrants """ resp = self.sess.get( self._full_url("authorization/grants/%s/%s/" % (kind, resource_id)) ) if not resp.status_code == 200: raise APIError(resp.text) return resp.json()
[docs] def set_authorization_grants( self, kind: GrantKind, resource_id: str, grants_data: dict, ): """ **beta** function. Set the authorization grants for a given resource. Args: kind: The kind of resource to set the grants for resource_id: The ID of the resource grants: See https://app.picterra.ch/public/apidocs/v2#tag/authorization/operation/setGrants Returns: dict: The updated authorization grants for the resource. """ resp = self.sess.post( self._full_url("authorization/grants/%s/%s/" % (kind, resource_id)), json=grants_data, ) if not resp.status_code == 201: raise APIError(resp.text) return resp.json()