import json
import logging
import os
import tempfile
import time
import warnings
from collections.abc import Callable
from typing import Any, Generic, Iterator, Literal, TypedDict, TypeVar
from urllib.parse import urlencode, urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger()
CHUNK_SIZE_BYTES = 8192 # 8 KiB
[docs]
class APIError(Exception):
"""Generic API error exception"""
pass
class _RequestsSession(requests.Session):
"""
Override requests session to to implement a global session timeout
"""
def __init__(self, *args, **kwargs):
self.timeout = kwargs.pop("timeout")
super().__init__(*args, **kwargs)
self.headers.update(
{"User-Agent": "picterra-python %s" % self.headers["User-Agent"]}
)
def request(self, *args, **kwargs):
kwargs.setdefault("timeout", self.timeout)
return super().request(*args, **kwargs)
def _download_to_file(url: str, filename: str):
# Given we do not use self.sess the timeout is disabled (requests default), and this
# is good as file download can take a long time
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(filename, "wb+") as f:
logger.debug("Downloading to file %s.." % filename)
for chunk in r.iter_content(chunk_size=CHUNK_SIZE_BYTES):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
def _upload_file_to_blobstore(upload_url: str, filename: str):
if not (os.path.exists(filename) and os.path.isfile(filename)):
raise ValueError("Invalid file: " + filename)
with open(
filename, "rb"
) as f: # binary recommended by requests stream upload (see link below)
logger.debug("Opening and streaming to upload file %s" % filename)
# Given we do not use self.sess the timeout is disabled (requests default), and this
# is good as file upload can take a long time. Also we use requests streaming upload
# (https://requests.readthedocs.io/en/latest/user/advanced/#streaming-uploads) to avoid
# reading the (potentially large) layer GeoJSON in memory
resp = requests.put(upload_url, data=f)
if not resp.ok:
logger.error("Error when uploading to blobstore %s" % upload_url)
raise APIError(resp.text)
T = TypeVar("T")
[docs]
class ResultsPage(Generic[T]):
"""
Interface for a paginated response from the API
Typically the endpoint returning list of objects return them splitted
in pages (page 1, page 2, etc..) of a fixed dimension (eg 20). Thus
each `list_XX` function returns a ResultsPage (by default the first one);
once you have a ResultsPage for a given list of objects, you can:
* check its length with `len()` (eg `len(page)`)
* access a single element with the index operator `[]` (eg `page[5]`)
* turn it into a list of dictionaries with `list()` (eg `list(page)`)
* get the next page with `.next()` (eg `page.next()`); this could return
None if the list is finished
You can also get a specific page passing the page number to the `list_XX` function
"""
def __init__(self, url: str, fetch: Callable[[str], requests.Response]):
resp = fetch(url)
if not resp.ok:
raise APIError(resp.text)
r: dict[str, Any] = resp.json()
next_url: str | None = r["next"]
results: list[T] = r["results"]
self._fetch = fetch
self._next_url = next_url
self._results = results
self._url = url
def next(self):
return ResultsPage(self._next_url, self._fetch) if self._next_url else None
def __len__(self) -> int:
return len(self._results)
def __getitem__(self, key: int) -> T:
return self._results[key]
def __iter__(self) -> Iterator[T]:
return iter([self._results[i] for i in range(len(self._results))])
def __str__(self) -> str:
return f"{len(self._results)} results from {self._url}"
class Feature(TypedDict):
type: Literal["Feature"]
properties: dict[str, Any]
geometry: dict[str, Any]
class FeatureCollection(TypedDict):
type: Literal["FeatureCollection"]
features: list[Feature]
[docs]
class APIClient:
"""Main client class for the Picterra API"""
def __init__(
self, timeout: int = 30, max_retries: int = 3, backoff_factor: int = 10
):
"""
Args:
timeout: number of seconds before the request times out
max_retries: max attempts when ecountering gateway issues or throttles; see
retry_strategy comment below
backoff_factor: factor used nin the backoff algorithm; see retry_strategy comment below
"""
base_url = os.environ.get(
"PICTERRA_BASE_URL", "https://app.picterra.ch/public/api/v2/"
)
api_key = os.environ.get("PICTERRA_API_KEY", None)
if not api_key:
raise APIError("PICTERRA_API_KEY environment variable is not defined")
logger.info(
"Using base_url=%s; %d max retries, %d backoff and %s timeout.",
base_url,
max_retries,
backoff_factor,
timeout,
)
self.base_url = base_url
# Create the session with a default timeout (30 sec), that we can then
# override on a per-endpoint basis (will be disabled for file uploads and downloads)
self.sess = _RequestsSession(timeout=timeout)
# Retry: we set the HTTP codes for our throttle (429) plus possible gateway problems (50*),
# and for polling methods (GET), as non-idempotent ones should be addressed via idempotency
# key mechanism; given the algorithm is {<backoff_factor> * (2 **<retries-1>}, and we
# default to 30s for polling and max 30 req/min, the default 5-10-20 sequence should
# provide enough room for recovery
retry_strategy = Retry(
total=max_retries,
status_forcelist=[429, 502, 503, 504],
backoff_factor=backoff_factor,
allowed_methods=["GET"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.sess.mount("https://", adapter)
self.sess.mount("http://", adapter)
# Authentication
self.sess.headers.update({"X-Api-Key": api_key})
def _api_url(self, path: str, params: dict[str, Any] | None = None):
base_url = urljoin(self.base_url, path)
if not params:
return base_url
else:
qstr = urlencode(params)
return "%s?%s" % (base_url, qstr)
def _wait_until_operation_completes(
self, operation_response: dict[str, Any]
) -> dict[str, Any]:
"""Polls an operation an returns its data"""
operation_id = operation_response["operation_id"]
poll_interval = operation_response["poll_interval"]
# Just sleep for a short while the first time
time.sleep(poll_interval * 0.1)
while True:
logger.info("Polling operation id %s" % operation_id)
resp = self.sess.get(
self._api_url("operations/%s/" % operation_id),
)
if not resp.ok:
raise APIError(resp.text)
status = resp.json()["status"]
logger.info("status=%s" % status)
if status == "success":
break
if status == "failed":
raise APIError("Operation %s failed" % operation_id)
time.sleep(poll_interval)
return resp.json()
def _return_results_page(
self, resource_endpoint: str, params: dict[str, Any] | None = None
) -> ResultsPage:
if params is None:
params = {}
if "page_number" not in params:
params["page_number"] = 1
url = self._api_url("%s/" % resource_endpoint, params=params)
return ResultsPage(url, self.sess.get)
[docs]
def upload_raster(
self,
filename: str,
name: str,
folder_id: str | None = None,
captured_at: str | None = None,
identity_key: str | None = None,
multispectral: bool = False,
cloud_coverage: int | None = None,
user_tag: str | None = None,
) -> str:
"""
Upload a raster to picterra.
Args:
filename: Local filename of raster to upload
name: A human-readable name for this raster
folder_id: Id of the folder this raster
belongs to; if not provided, the raster will be put in the
"Picterra API Project" folder
captured_at: ISO-8601 date and time at which this
raster was captured, YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z];
e.g. "2020-01-01T12:34:56.789Z"
identity_key: Personal identifier for this raster.
multispectral: If True, the raster is in multispectral mode and can have
an associated band specification
cloud_coverage: Raster cloud coverage %.
user_tag (beta): Raster tag
Returns:
raster_id: The id of the uploaded raster
"""
data: dict[str, Any] = {"name": name, "multispectral": multispectral}
if folder_id is not None:
data.update({"folder_id": folder_id})
if captured_at is not None:
data.update({"captured_at": captured_at})
if identity_key is not None:
data.update({"identity_key": identity_key})
if cloud_coverage is not None:
data.update({"cloud_coverage": cloud_coverage})
if user_tag is not None:
data.update({"user_tag": user_tag})
resp = self.sess.post(self._api_url("rasters/upload/file/"), json=data)
if not resp.ok:
raise APIError(resp.text)
data = resp.json()
upload_url = str(data["upload_url"])
raster_id: str = data["raster_id"]
_upload_file_to_blobstore(upload_url, filename)
resp = self.sess.post(self._api_url("rasters/%s/commit/" % raster_id))
if not resp.ok:
raise APIError(resp.text)
self._wait_until_operation_completes(resp.json())
return raster_id
[docs]
def list_folder_detectors(self, folder_id: str, page_number: int | None = None):
"""
List of detectors assigned to a given folder, see `ResultsPage`
for the pagination access pattern.
This a **beta** function, subject to change.
Args:
folder_id: The id of the folder to obtain the detectors for
page_number: Optional page (from 1) of the list we want to retrieve
Returns:
A ResultsPage object that contains a slice of the list of detector dictionaries,
plus methods to retrieve the other pages
Example:
::
{
"id": "id1",
"name": "detector1",
"is_runnable": True,
"user_tag": "tag1",
},
{
"id": "id2",
"name": "detector2",
"is_runnable": False,
"user_tag": "tag2",
}
"""
return self._return_results_page(
"folders/%s/detectors" % folder_id,
{"page_number": page_number} if page_number is not None else None,
)
[docs]
def list_rasters(
self,
folder_id: str | None = None,
search_string: str | None = None,
user_tag: str | None = None,
max_cloud_coverage: int | None = None,
captured_before: str | None = None,
captured_after: str | None = None,
has_vector_layers: bool | None = None,
page_number: int | None = None,
):
"""
List of rasters metadata, see `ResultsPage` for the pagination access pattern.
Args:
folder_id: The id of the folder to search rasters in
search_string: The search term used to filter rasters by name
user_tag: [beta] The user tag to filter rasters by
max_cloud_coverage: [beta] The max_cloud_coverage of the rasters (between 0 and 100)
captured_before: ISO 8601 -formatted date / time of capture
we want to list the rasters since
captured_after: ISO 8601 -formatted date / time of capture
we want to list the rasters from
has_vector_layers: [beta] Whether or not the rasters have at least one vector layer
page_number: Optional page (from 1) of the list we want to retrieve
Returns:
A list of rasters dictionaries
Example:
::
{
'id': '42',
'status': 'ready',
'name': 'raster1',
'folder_id': 'abc'
},
{
'id': '43',
'status': 'ready',
'name': 'raster2',
'folder_id': 'def'
}
"""
params: dict[str, Any] = {}
if folder_id:
params["folder"] = folder_id
if search_string:
params["search"] = search_string
if user_tag is not None:
params["user_tag"] = user_tag.strip()
if max_cloud_coverage is not None:
params["max_cloud_coverage"] = max_cloud_coverage
if captured_before is not None:
params["captured_before"] = captured_before
if captured_after is not None:
params["captured_after"] = captured_after
if has_vector_layers is not None:
params["has_vector_layers"] = bool(has_vector_layers)
if page_number is not None:
params["page_number"] = page_number
return self._return_results_page("rasters", params)
[docs]
def get_raster(self, raster_id: str) -> dict[str, Any]:
"""
Get raster information
Args:
raster_id: id of the raster
Raises:
APIError: There was an error while getting the raster information
Returns:
dict: Dictionary of the information
"""
resp = self.sess.get(self._api_url("rasters/%s/" % raster_id))
if not resp.ok:
raise APIError(resp.text)
return resp.json()
[docs]
def edit_raster(
self,
raster_id: str,
name: str | None = None,
folder_id: str | None = None,
captured_at: str | None = None,
identity_key: str | None = None,
multispectral_band_specification: dict | None = None,
cloud_coverage: int | None = None,
user_tag: str | None = None,
):
"""
Edits an already existing raster.
Args:
name: New human-readable name for this raster
folder_id: Id of the new folder for this raster (move is in another project)
captured_at: new ISO-8601 date and time at which this
raster was captured, YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z];
e.g. "2020-01-01T12:34:56.789Z"
identity_key: New personal identifier for this raster.
multispectral_band_specification: The new band specification,
see https://docs.picterra.ch/advanced-topics/multispectral
cloud_coverage: Raster cloud coverage new percentage
user_tag (beta): Raster tag
Returns:
raster_id: The id of the edited raster
"""
data: dict[str, Any] = {}
if name:
data.update({"name": name})
if folder_id is not None:
data.update({"folder_id": folder_id})
if captured_at is not None:
data.update({"captured_at": captured_at})
if identity_key is not None:
data.update({"identity_key": identity_key})
if multispectral_band_specification is not None:
data.update(
{"multispectral_band_specification": multispectral_band_specification}
)
if cloud_coverage is not None:
data.update({"cloud_coverage": cloud_coverage})
if user_tag:
data.update({"user_tag": user_tag})
resp = self.sess.put(self._api_url("rasters/%s/" % raster_id), json=data)
if not resp.ok:
raise APIError(resp.text)
return raster_id
[docs]
def delete_raster(self, raster_id: str):
"""
Deletes a given raster by its identifier
Args:
raster_id: The id of the raster to delete
Raises:
APIError: There was an error while trying to delete the raster
"""
resp = self.sess.delete(self._api_url("rasters/%s/" % raster_id))
if not resp.ok:
raise APIError(resp.text)
[docs]
def download_raster_to_file(self, raster_id: str, filename: str):
"""
Downloads a raster to a local file
Args:
raster_id: The id of the raster to download
filename: The local filename where to save the raster image
Raises:
APIError: There was an error while trying to download the raster
"""
resp = self.sess.get(self._api_url("rasters/%s/download/" % raster_id))
if not resp.ok:
raise APIError(resp.text)
raster_url = resp.json()["download_url"]
logger.debug("Trying to download raster %s from %s.." % (raster_id, raster_url))
_download_to_file(raster_url, filename)
[docs]
def set_raster_detection_areas_from_file(self, raster_id: str, filename: str):
"""
This is an experimental feature
Set detection areas from a GeoJSON file
Args:
raster_id: The id of the raster to which to assign the detection areas
filename: The filename of a GeoJSON file. This should contain a FeatureCollection
of Polygon/MultiPolygon
Raises:
APIError: There was an error uploading the file to cloud storage
"""
# Get upload URL
resp = self.sess.post(
self._api_url("rasters/%s/detection_areas/upload/file/" % raster_id)
)
if not resp.ok:
raise APIError(resp.text)
data = resp.json()
upload_url = data["upload_url"]
upload_id = data["upload_id"]
# Upload to blobstore
_upload_file_to_blobstore(upload_url, filename)
# Commit upload
resp = self.sess.post(
self._api_url(
"rasters/%s/detection_areas/upload/%s/commit/" % (raster_id, upload_id)
)
)
if not resp.ok:
raise APIError(resp.text)
self._wait_until_operation_completes(resp.json())
[docs]
def remove_raster_detection_areas(self, raster_id: str):
"""
This is an experimental feature
Remove the detection areas of a raster
Args:
raster_id: The id of the raster whose detection areas will be removed
Raises:
APIError: There was an error during the operation
"""
resp = self.sess.delete(
self._api_url("rasters/%s/detection_areas/" % raster_id)
)
if not resp.ok:
raise APIError(resp.text)
[docs]
def add_raster_to_detector(self, raster_id: str, detector_id: str):
"""
Associate a raster to a detector
This a **beta** function, subject to change.
Args:
detector_id: The id of the detector
raster_id: The id of the raster
Raises:
APIError: There was an error uploading the file to cloud storage
"""
resp = self.sess.post(
self._api_url("detectors/%s/training_rasters/" % detector_id),
json={"raster_id": raster_id},
)
if not resp.status_code == 201:
raise APIError(resp.text)
[docs]
def create_detector(
self,
name: str | None = None,
detection_type: str = "count",
output_type: str = "polygon",
training_steps: int = 500,
backbone: str = "resnet34",
tile_size: int = 256,
background_sample_ratio: float = 0.25,
) -> str:
"""
Creates a new detector
This a **beta** function, subject to change.
Please note that depending on your plan some setting cannot be different
from the default ones
Args:
name: Name of the detector
detection_type: Type of the detector (one of 'count', 'segmentation')
output_type: Output type of the detector (one of 'polygon', 'bbox')
training_steps: Training steps the detector (integer between 500 & 40000)
backbone: detector backbone (one of 'resnet18', 'resnet34', 'resnet50')
tile_size: tile size (see HTTP API docs for the allowed values)
background_sample_ratio: bg sample ratio (between 0 and 1)
Returns:
The id of the detector
Raises:
APIError: There was an error while creating the detector
"""
# Build request body
body_data: dict[str, Any] = {"configuration": {}}
if name:
body_data["name"] = name
for i in (
"detection_type",
"output_type",
"training_steps",
"backbone",
"tile_size",
"background_sample_ratio",
):
body_data["configuration"][i] = locals()[i]
# Call API and check response
resp = self.sess.post(self._api_url("detectors/"), json=body_data)
if not resp.status_code == 201:
raise APIError(resp.text)
return resp.json()["id"]
[docs]
def list_detectors(
self,
search_string: str | None = None,
user_tag: str | None = None,
is_shared: bool | None = None,
page_number: int | None = None,
):
"""
List all the detectors the user can access, see `ResultsPage`
for the pagination access pattern.
Args:
search_string: The term used to filter detectors by name
user_tag: [beta] User tag to filter detectors by
is_shared: [beta] Share status to filter detectors by
page_number: Optional page (from 1) of the list we want to retrieve
Returns:
A list of detectors dictionaries
Example:
::
{
'id': '42',
'name': 'cow detector',
'configuration': {
'detection_type': 'count',
'output_type': 'bbox',
'training_steps': 787
}
},
{
'id': '43',
'name': 'test5',
'configuration': {
'detection_type': 'segmentation',
'output_type': 'polygon',
'training_steps': 500
}
}
"""
data: dict[str, Any] = {}
if search_string is not None:
data["search"] = search_string.strip()
if user_tag is not None:
data["user_tag"] = user_tag.strip()
if is_shared is not None:
data["is_shared"] = is_shared
if page_number is not None:
data["page_number"] = page_number
return self._return_results_page("detectors", data)
[docs]
def edit_detector(
self,
detector_id: str,
name: str | None = None,
detection_type: str | None = None,
output_type: str | None = None,
training_steps: int | None = None,
backbone: str | None = None,
tile_size: int | None = None,
background_sample_ratio: float | None = None,
):
"""
Edit a detector
This a **beta** function, subject to change.
Please note that depending on your plan some settings may not be editable.
Args:
detector_id: identifier of the detector
name: Name of the detector
detection_type: The type of the detector (one of 'count', 'segmentation')
output_type: The output type of the detector (one of 'polygon', 'bbox')
training_steps: The training steps the detector (int in [500, 40000])
backbone: detector backbone (one of 'resnet18', 'resnet34', 'resnet50')
tile_size: tile size (see HTTP API docs for the allowed values)
background_sample_ratio: bg sample ratio (between 0 and 1)
Raises:
APIError: There was an error while editing the detector
"""
# Build request body
body_data: dict[str, Any] = {"configuration": {}}
if name:
body_data["name"] = name
for i in (
"detection_type",
"output_type",
"training_steps",
"backbone",
"tile_size",
"background_sample_ratio",
):
if locals()[i]:
body_data["configuration"][i] = locals()[i]
# Call API and check response
resp = self.sess.put(
self._api_url("detectors/%s/" % detector_id), json=body_data
)
if not resp.status_code == 204:
raise APIError(resp.text)
[docs]
def delete_detector(self, detector_id: str):
"""
Deletes a given detector by its identifier
Args:
detector_id: The id of the detector to delete
Raises:
APIError: There was an error while trying to delete the detector
"""
resp = self.sess.delete(self._api_url("detectors/%s/" % detector_id))
if not resp.ok:
raise APIError(resp.text)
[docs]
def run_detector(self, detector_id: str, raster_id: str) -> str:
"""
Runs a detector on a raster: predictions are subject to a minimum charge
of 10 MP.
Args:
detector_id: The id of the detector
raster_id: The id of the raster
Returns:
operation_id: The id of the operation. You typically want to pass this
to `download_result_to_feature_collection`
"""
resp = self.sess.post(
self._api_url("detectors/%s/run/" % detector_id),
json={"raster_id": raster_id},
)
if not resp.ok:
raise APIError(resp.text)
operation_response = resp.json()
self._wait_until_operation_completes(operation_response)
return operation_response["operation_id"]
[docs]
def download_result_to_file(self, operation_id: str, filename: str):
"""
Downloads a set of results to a local GeoJSON file
.. deprecated:: 1.0.0
Use `download_result_to_feature_collection` instead
Args:
operation_id: The id of the operation to download
filename: The local filename where to save the results
"""
warnings.warn(
"This function is deprecated. Use download_result_to_feature_collection instead",
DeprecationWarning,
)
result_url = self.get_operation_results(operation_id)["url"]
logger.debug("Trying to download result %s.." % result_url)
_download_to_file(result_url, filename)
[docs]
def download_result_to_feature_collection(self, operation_id: str, filename: str):
"""
Downloads the results from a detection operation to a local GeoJSON file.
Results are stored as a FeatureCollection of Multipolygon. Each feature has a 'class_name'
property indicating the corresponding class name
Args:
operation_id: The id of the operation to download. This should be a
detect operation
filename: The local filename where to save the results
"""
results = self.get_operation_results(operation_id)
# We download results to a temporary directory and then assemble them into a
# FeatureCollection
fc: FeatureCollection = {"type": "FeatureCollection", "features": []}
for i, class_result in enumerate(results["by_class"]):
with tempfile.NamedTemporaryFile() as f:
_download_to_file(class_result["result"]["url"], f.name)
# Reopen in read text
with open(f.name) as fr:
multipolygon = json.load(fr)
fc["features"].append(
{
"type": "Feature",
"properties": {"class_name": class_result["class"]["name"]},
"geometry": multipolygon,
}
)
with open(filename, "w") as f:
json.dump(fc, f)
[docs]
def download_operation_results_to_file(self, operation_id: str, filename: str):
"""
Downloads the results URL to a local GeoJSON file
Args:
operation_id: The id of the operation to download
filename: The local filename where to save the results
"""
data = self.get_operation_results_url(operation_id)
with open(filename, "w") as f:
f.write(data)
[docs]
def get_operation_results(self, operation_id: str) -> dict[str, Any]:
"""
Return the 'results' dict of an operation
This a **beta** function, subject to change.
Args:
operation_id: The id of the operation
"""
resp = self.sess.get(
self._api_url("operations/%s/" % operation_id),
)
return resp.json()["results"]
[docs]
def get_operation_results_url(self, operation_id: str) -> str:
"""
Get the URL of a set of operation results
This a **beta** function, subject to change.
Args:
operation_id: The id of the result
"""
return self.get_operation_results(operation_id)["url"]
[docs]
def set_annotations(
self,
detector_id: str,
raster_id: str,
annotation_type: Literal[
"outline", "training_area", "testing_area", "validation_area"
],
annotations: dict[str, Any],
):
"""
Replaces the annotations of type 'annotation_type' with 'annotations', for the
given raster-detector pair.
Args:
detector_id: The id of the detector
raster_id: The id of the raster
annotation_type: One of (outline, training_area, testing_area, validation_area)
annotations: GeoJSON representation of the features to upload
"""
# Get an upload url
create_upload_resp = self.sess.post(
self._api_url(
"detectors/%s/training_rasters/%s/%s/upload/bulk/"
% (detector_id, raster_id, annotation_type)
)
)
if not create_upload_resp.ok:
raise APIError(create_upload_resp.text)
upload = create_upload_resp.json()
upload_url = upload["upload_url"]
upload_id = upload["upload_id"]
# Given we do not use self.sess the timeout is disabled (requests default), and this
# is good as file upload can take a long time
upload_resp = requests.put(upload_url, json=annotations)
if not upload_resp.ok:
logger.error(
"Error when sending annotation upload %s to blobstore at url %s"
% (upload_id, upload_url)
)
raise APIError(upload_resp.text)
# Commit upload
commit_upload_resp = self.sess.post(
self._api_url(
"detectors/%s/training_rasters/%s/%s/upload/bulk/%s/commit/"
% (detector_id, raster_id, annotation_type, upload_id)
)
)
if not commit_upload_resp.ok:
raise APIError(commit_upload_resp.text)
# Poll for operation completion
self._wait_until_operation_completes(commit_upload_resp.json())
[docs]
def train_detector(self, detector_id: str):
"""
Start the training of a detector
Args:
detector_id: The id of the detector
"""
resp = self.sess.post(self._api_url("detectors/%s/train/" % detector_id))
if not resp.ok:
raise APIError(resp.text)
return self._wait_until_operation_completes(resp.json())
[docs]
def run_dataset_recommendation(self, detector_id: str):
"""
This is an **experimental** feature
Runs dataset recommendation on a detector. Note that you currently have to use
the UI to be able to view the recommendation markers/report.
Args:
detector_id: The id of the detector
"""
resp = self.sess.post(
self._api_url("detectors/%s/dataset_recommendation/" % detector_id)
)
if not resp.ok:
raise APIError(resp.text)
return self._wait_until_operation_completes(resp.json())
[docs]
def upload_vector_layer(
self,
raster_id: str,
filename: str,
name: str | None = None,
color: str | None = None,
) -> str:
"""
Uploads a vector layer from a GeoJSON file
This a **beta** function, subject to change.
Args:
raster_id: The id of the raster we want to attach the vector layer to
filename: Path to the local GeoJSOn file we want to upload
name: Optional name to give to the vector layer
color: Optional color of the vector layer, has an HTML hex color code (eg "#aabbcc")
Returns;
the vector layer unique identifier
"""
resp = self.sess.post(self._api_url("vector_layers/%s/upload/" % raster_id))
if not resp.ok:
raise APIError(resp.text)
upload = resp.json()
upload_id, upload_url = upload["upload_id"], upload["upload_url"]
_upload_file_to_blobstore(upload_url, filename)
data = {}
if name is not None:
data["name"] = name
if color is not None:
data["color"] = color
resp = self.sess.post(
self._api_url(
"vector_layers/%s/upload/%s/commit/" % (raster_id, upload_id)
),
json=data,
)
if not resp.ok:
raise APIError(resp.text)
op = self._wait_until_operation_completes(resp.json())
return op["results"]["vector_layer_id"]
[docs]
def edit_vector_layer(
self, vector_layer_id: str, name: str | None = None, color: str | None = None
):
"""
Edits a vector layer
This a **beta** function, subject to change.
Args:
vector_layer_id: The id of the vector layer to remove
name: new name
color: new color
"""
data = {}
if name:
data.update({"name": name})
if color is not None:
data.update({"color": color})
resp = self.sess.put(
self._api_url("vector_layers/%s/" % vector_layer_id), json=data
)
if not resp.ok:
raise APIError(resp.text)
[docs]
def delete_vector_layer(self, vector_layer_id: str):
"""
Removes a vector layer
This a **beta** function, subject to change.
Args:
vector_layer_id: The id of the vector layer to remove
"""
resp = self.sess.delete(self._api_url("vector_layers/%s/" % vector_layer_id))
if not resp.ok:
raise APIError(resp.text)
[docs]
def download_vector_layer_to_file(self, vector_layer_id: str, filename: str):
"""
Downloads a vector layer
This a **beta** function, subject to change.
Args:
vector_layer_id: The id of the vector layer to download
filename: existing file to save the vector layer in
"""
resp = self.sess.get(self._api_url("vector_layers/%s/" % vector_layer_id))
if not resp.ok:
raise APIError(resp.text)
urls = resp.json()["geojson_urls"]
final_fc: FeatureCollection = {"type": "FeatureCollection", "features": []}
for url in urls:
with tempfile.NamedTemporaryFile("w+") as f:
_download_to_file(url, f.name)
fc = json.load(f)
for feature in fc["features"]:
final_fc["features"].append(feature)
with open(filename, "w") as fp:
json.dump(final_fc, fp)
[docs]
def list_raster_markers(
self,
raster_id: str,
page_number: int | None = None,
):
"""
This a **beta** function, subject to change.
List all the markers on a raster, see `ResultsPage` for the pagination access pattern.
Args:
raster_id: The id of the raster
page_number: Optional page (from 1) of the list we want to retrieve
"""
return self._return_results_page(
"rasters/%s/markers/" % raster_id,
{"page_number": page_number} if page_number is not None else None,
)
[docs]
def create_marker(
self,
raster_id: str,
detector_id: str | None,
lng: float,
lat: float,
text: str,
) -> dict[str, Any]:
"""
This is an **experimental** (beta) feature
Creates a marker
Args:
raster_id: The id of the raster (belonging to detector) to create the marker on
detector_id: The id of the detector to create the marker on. If this is None, the marker
is created associated with the raster only
Raises:
APIError: There was an error while creating the marker
"""
if detector_id is None:
url = "rasters/%s/markers/" % raster_id
else:
url = "detectors/%s/training_rasters/%s/markers/" % (detector_id, raster_id)
data = {
"marker": {"type": "Point", "coordinates": [lng, lat]},
"text": text,
}
resp = self.sess.post(self._api_url(url), json=data)
if not resp.ok:
raise APIError(resp.text)
return resp.json()
[docs]
def import_raster_from_remote_source(
self,
raster_name: str,
folder_id: str,
source_id: str,
aoi_filename: str,
method: Literal["streaming"] = "streaming",
) -> str:
"""
This is an experimental feature
Import a raster from a remote imagery source given a GeoJSON file for the AOI
Args:
raster_name: Name of the new raster
folder_id: The id of the folder / project the raster will live in
source_id: The id of the remote imagery source to import from
filename: The filename of a GeoJSON file. This should contain a FeatureCollection of
Polygon/MultiPolygon representing the AOI of the new raster
Raises:
APIError: There was an error during import
"""
# Get upload URL
resp = self.sess.post(self._api_url("rasters/import/"))
if not resp.ok:
raise APIError(resp.text)
data = resp.json()
upload_url = data["upload_url"]
upload_id = data["upload_id"]
# Upload to blobstore
_upload_file_to_blobstore(upload_url, aoi_filename)
# Commit upload
resp = self.sess.post(
self._api_url(f"rasters/import/{upload_id}/commit/"),
json={
"method": method,
"source_id": source_id,
"folder_id": folder_id,
"name": raster_name,
},
)
if not resp.ok:
raise APIError(resp.text)
# Poll operation and get raster identifier
operation = self._wait_until_operation_completes(resp.json())
return operation["metadata"]["raster_id"]
[docs]
def list_raster_vector_layers(
self,
raster_id: str,
search: str | None = None,
detector_id: str | None = None,
page_number: int | None = None,
):
"""
This a **beta** function, subject to change.
List all the vector layers on a raster, see `ResultsPage`
for the pagination access pattern.
Args:
raster_id: The id of the raster
search: Optional string to search layers by name
page_number: Optional page (from 1) of the list we want to retrieve
"""
params: dict[str, str | int] = {}
if search is not None:
params["search"] = search
if detector_id is not None:
params["detector"] = detector_id
if page_number is not None:
params["page_number"] = page_number
url = "rasters/%s/vector_layers/" % raster_id
return self._return_results_page(url, params)