Hi all, I want to write about geospatial interpolation methods and the underlying math. Potentially, in a follow-up blog post, I’ll also discuss example, but in this post, I’ll confine myself to some of the theory, introducing concepts like the variogram and the kriging equations: terms which I wasn’t familiar with until very recently. So this post serves as a consolidation of my newly obtained knowledge. It can serve as a background to a text such as this.
What is Spatial Interpolation?
Spatial interpolation is evaluating/prediction a particular value \(Z\) at a point \(s\), denoted \(Z(s)\) on the basis of observed data points \(s_i\). As a motivating example, and potentially the simplest form of spatial interpolation, we can give the following:
with \(w_i = | s - s_i |^{-p}\) and usually, the “inverse distance power” \(p\) equal to two, or potentially with optimized weights using cross validation. We should think of \(s\) as being pixels in a raster data set, for which we have available observable points \(s_i\), which we want to extrapolate to all other points, one such point being denoted as \(s\). This is also the notation that I follow in the remainder of this post: \(s\) is an arbitrary pixel we want to extrapolate for.
Spatial Covariance and the Variogram
The main method (as far as I get) used to interpolate geospatially is not the preceding method, even though it can be used, sometimes with good results. However, the main method starts with a stationary model such as this:
\[
Z(s) = m + e(s)
\]
where \(m\) is the mean of the value \(Z\), and \(e(s)\) is a mean-zero error term. One object of interest here is the covariance of the error term \(e(s)\). I referred to the model above as stationary exactly because of the structure imposed on this covariance matrix. In the standard model, it looks like this. For \(n\) observed locations \(s_1, \dots, s_n\), the spatial covariance matrix \(C\) is an \(n \times n\) matrix where each element represents the covariance between \(e(s_i)\) and \(e(s_j)\):
where \(h_{ij} = ||s_i - s_j||\) is the distance between locations \(s_i\) and \(s_j\). Implicit in this matrix, there is a covariance function\(C(h)\), which defines the covariance between two observations squarely as a function of distance between them \(h\).
The covariance function \(C(h)\), with \(h\) being the distance between an arbitrary point \(s\) and point \(s+h\), is defined as:
Since we have imposed that \(E(e(s))=0\), this simplifies to:
\[
C(h)=E[e(s)e(s+h)].
\]
Note also that \(C(0)\) represents the variance of \(e(s)\). The variogram is a relationship that reflects the difference between \(C(h)\) and \(C(0)\). The variogram is defined as:
Essentially, the variogram can be thought of as a continuous function in \(h\) that aligns with these sample covariances. The variogram \(\gamma (h)\) and covariance \(\mathbf{C}(h)\) are mathematically related as:
\[
\gamma(h)=C(0)−C(h).
\]
To see that, expand the square in the definition of the variogram:
The variogram is a theoretical construct, but can also be estimated, for example, as follows. Let \(h_i\) be a distance interval from \(h_{i0}\) to \(h_{i1}\):
\[
\hat{\gamma}(h) = \frac{1}{2N(h_i)} \sum_{j=1}^{N(h_i)} (z(s_i) - z(s_i + h'))^2 \text{ for } h_{i0} < h' < h_{i1}.
\]\(N(h_i)\) is the number of observations (sample pairs) available for distance interval \(h_i\).
Why do we need the variogram? The variogram allows you to estimate the covariance matrix \(C(h)\) in an easy and convenient way, by fitting a curve to the sample variogram. Additionally, it turns out that when spatial interpolating using a method called kriging, the form used in the variogram turns out to be convenient, as we will see.
The Kriging System Setup
We aim to predict \(Z(s)\) as a weighted sum of the observed values \(Z(s_i)\):
\[
Z (s) = \sum_{i=1}^n \lambda_i Z (s_i)
\]
while ensuring two things:
The prediction is unbiased, meaning \(E[Z(s)]=E[Z(s_i)]\).
The prediction minimizes the variance of the error.
The unbiasedness condition requires that the weights sum to 1, \(\sum_{i=1}^N \lambda_i = 1\).
To minimize the prediction error variance, we use the spatial correlation structure encoded by the variogram. For each to be interpolated observation \(Z(s)\), the kriging weights \(\lambda_i\) are chosen such that the variance of the prediction error:
That leads to the so-called kriging system, providing a solution for the weights \(\lambda_i\). I’ll first state this system of equations, then explain where it comes from. For each observed point\(s_i\), the kriging system requires the following relationship to hold between that observed point \(s_i\), the variogram value corresponding to the distance of that observed point with all other observed points, and the distance between the observed point \(s_i\) and the to be interpolated point \(s*\):
where \(\gamma (h_{ij})\) are the variogram value for the distance between \(s_i\) and \(s_j\), \(\gamma (h_{i*})\) is the variogram value for the distance between \(s_i\) and the predicted location \(s*\), and \(\mu\) is a Lagrange multiplier enforcing the constraint \(\sum_{j=1}^N \lambda_j = 1\). The constraint \(\sum_{j=1}^N \lambda_j = 1\) is the second component of the kriging system. Since this relationship holds for all observed points \(s_i\), we can rewrite this using linear algebra. But mind that this is a separate system for each to be interpolated point \(s*\):
Let \(\Gamma\) be a matrix with the variogram values between all observed points:
Intuitively, it makes sense that the distance between any point \(s_i\) and a to be interpolated point \(s*\) is a some weighted average of all other distances. Since variograms are a function of distances only, the relationship should also hold for variograms. But a more formal derivation is as follows:
whereas differentiating with respect to the Lagrange multiplier \(\mu\) just gives us back our constraint. Normalizing the Lagrange multiplier, and writing these \(n\) equations in a system from combined with the constraint gives us the matrix formulation as seen above. Since all of these things are observed, we can simply solve the kriging system for \((\lambda, \mu)\) and find our weights \(\lambda\).
Implementation in R
See below for a code implementation of Kriging in R.
Code
# Source: https://github.com/r-spatial/gstat/pull/108## FNN local prediction########################library(sp)library(stars)library(spacetime)library(gstat)library(lattice)# create n space-time points over [0,1] x [0,1] x [Now, Now+some days]t0 =Sys.time() # nown =1000set.seed(13131) # fix outcomesx =runif(n)y =runif(n)t = t0 +1e6*runif(n)z =rnorm(n)stidf =STIDF(SpatialPoints(cbind(x,y)), sort(t), data.frame(z=z))stplot(stidf, number=21, main="random spatio-temporal noise")sft =st_as_sftime(stidf)sft # My actual data# create a regular 20 x 20 x 10 grid of prediction locations:grd =as(SpatialGrid(GridTopology(c(0.025,0.025), c(.05, .05), c(20,20))), "SpatialPixels")tgrd =seq(min(t)+10000, max(t)-10000, length.out =10)stf =STF(grd, tgrd)#stf = STFDF(grd, tgrd, data.frame(x=rep(0,400*10)))st =st_as_stars(stf)# define a variogram modelsumMetricModel <-vgmST("sumMetric",space=vgm(1/6, "Sph", 0.25, 1/60),time =vgm(2/6, "Exp", 1e5, 1/60),joint=vgm(0.4, "Exp", 0.3, 0.1),stAni=1/1e6)attr(sumMetricModel, "temporal unit") <-"secs"locKrig_sft <- gstat::krigeST(z~1, sft, st, sumMetricModel, nmax=20, computeVar = T)plot(sft) # Actual dataplot(locKrig_sft[1]) # Interpolated and Extrapolated Data
If you want to try it on real data, here are two Python scripts to download German or Dutch historical weather data.
import pandas as pdimport requestsimport refrom selenium import webdriverfrom selenium.webdriver.firefox.service import Servicefrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as EC# Create a Service object with the path to the geckodriverservice = Service(executable_path="./geckodriver")# Initialize the Firefox WebDriver with the Service objectdriver = webdriver.Firefox(service=service)# Open the webpagedriver.get("https://berkeleyearth.org/temperature-station-list/?phrase=Germany")# Wait until the table with class "test" is present on the pageWebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, "table")))# Find the table with class "test"table = driver.find_element(By.CLASS_NAME, "table")# Extract table rowsrows = table.find_elements(By.TAG_NAME, "tr")# Extract headersheaders = [header.text for header in rows[0].find_elements(By.TAG_NAME, "th")]# Extract table datadata = []for row in rows[1:]: cols = row.find_elements(By.TAG_NAME, "td") data.append([col.text for col in cols])# Create a DataFramedf = pd.DataFrame(data, columns=headers)# Append the DataFrame with hyperlinkshyperlink_els = driver.find_elements(by=By.CSS_SELECTOR, value ="tr td:nth-of-type(1) a")hyperlinks = [link.get_attribute("href") for link in hyperlink_els]df["links"] = hyperlinksdriver.quit()# Now, scrape the data for each of those hyperlinksdef scrape_data(link):# Step 1: Read the File Content number = re.findall(r'\d+', link)[0] url =''.join(['https://data.berkeleyearth.org/auto/Stations/TAVG/Text/', number, '-TAVG-Data.txt']) response = requests.get(url) lines = response.text.split('\n')iflen(lines) <2:returnNone# Step 2: Extract metadata metadata = {} data_lines = []for line in lines:if line.startswith('%'):if'Primary Name:'in line: metadata['Primary Name'] = line.split(':', 1)[1].strip()elif'Latitude:'in line: metadata['Latitude'] = line.split(':', 1)[1].strip()elif'Longitude:'in line: metadata['Longitude'] = line.split(':', 1)[1].strip()elif'Year'in line: colnames = [i.strip() for i in line.split(sep=',')]else:# Collect data linesif line.strip(): data_lines.append(line)# Step 3: Create a data.frame df = pd.DataFrame([line.split() for line in data_lines], columns=colnames)# Step 4: Add metadata as columns to the DataFrame df['Primary name'] = metadata.get('Primary Name', None) df['Latitude'] = metadata.get('Latitude', None) df['Longitude'] = metadata.get('Longitude', None)# Step 5: Export this to .csv title = metadata['Primary Name'].replace(' ', '').replace('/', '').strip().lower() df.to_csv(f'./{title}.csv', index=False)print(f"Exported {title}")return dfdf.links.apply(scrape_data)
Code
import asyncioimport loggingimport osfrom concurrent.futures import ThreadPoolExecutorfrom pathlib import Pathfrom typing import Anyfrom typing import Dictfrom typing import Listfrom typing import Tupleimport requestsfrom requests import Sessionlogging.basicConfig()logger = logging.getLogger(__name__)logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))def download_dataset_file( session: Session, base_url: str, dataset_name: str, dataset_version: str, filename: str, directory: str, overwrite: bool,) -> Tuple[bool, str]:# if a file from this dataset already exists, skip downloading it. file_path = Path(directory, filename).resolve()ifnot overwrite and file_path.exists(): logger.info(f"Dataset file '{filename}' was already downloaded.")returnTrue, filename endpoint =f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url" get_file_response = session.get(endpoint)# retrieve download URL for dataset fileif get_file_response.status_code !=200: logger.warning(f"Unable to get file: {filename}") logger.warning(get_file_response.content)returnFalse, filename# use download URL to GET dataset file. We don't need to set the 'Authorization' header,# The presigned download URL already has permissions to GET the file contents download_url = get_file_response.json().get("temporaryDownloadUrl")return download_file_from_temporary_download_url(download_url, directory, filename)def download_file_from_temporary_download_url(download_url, directory, filename):try:with requests.get(download_url, stream=True) as r: r.raise_for_status()withopen(f"{directory}/{filename}", "wb") as f:for chunk in r.iter_content(chunk_size=8192): f.write(chunk)exceptException: logger.exception("Unable to download file using download URL")returnFalse, filename logger.info(f"Downloaded dataset file '{filename}'")returnTrue, filenamedef list_dataset_files( session: Session, base_url: str, dataset_name: str, dataset_version: str, params: Dict[str, str],) -> Tuple[List[str], Dict[str, Any]]: logger.info(f"Retrieve dataset files with query params: {params}") list_files_endpoint =f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files" list_files_response = session.get(list_files_endpoint, params=params)if list_files_response.status_code !=200:raiseException("Unable to list initial dataset files")try: list_files_response_json = list_files_response.json() dataset_files = list_files_response_json.get("files") dataset_filenames =list(map(lambda x: x.get("filename"), dataset_files))return dataset_filenames, list_files_response_jsonexceptExceptionas e: logger.exception(e)raiseException(e)def get_max_worker_count(filesizes): size_for_threading =10_000_000# 10 MB average =sum(filesizes) /len(filesizes)# to prevent downloading multiple half files in case of a network failure with big filesif average > size_for_threading: threads =1else: threads =10return threadsasyncdef main(): api_key ="eyJvcmciOiI1ZTU1NGUxOTI3NGE5NjAwMDEyYTNlYjEiLCJpZCI6ImE1OGI5NGZmMDY5NDRhZDNhZjFkMDBmNDBmNTQyNjBkIiwiaCI6Im11cm11cjEyOCJ9" dataset_name ="antieke_reeksen" dataset_version ="v1.0" base_url ="https://api.dataplatform.knmi.nl/open-data/v1"# When set to True, if a file with the same name exists the output is written over the file.# To prevent unnecessary bandwidth usage, leave it set to False. overwrite =False download_directory ="./data/"# Make sure to send the API key with every HTTP request session = requests.Session() session.headers.update({"Authorization": api_key})# Verify that the download directory existsifnot Path(download_directory).is_dir() ornot Path(download_directory).exists():raiseException(f"Invalid or non-existing directory: {download_directory}") filenames = [] max_keys =500 next_page_token =None file_sizes = []# Use the API to get a list of all dataset filenameswhileTrue:# Retrieve dataset files after given filename dataset_filenames, response_json = list_dataset_files( session, base_url, dataset_name, dataset_version, {"maxKeys": f"{max_keys}", "nextPageToken": next_page_token}, ) file_sizes.extend(file["size"] forfilein response_json.get("files"))# Store filenames filenames += dataset_filenames# If the result is not truncated, we retrieved all filenames next_page_token = response_json.get("nextPageToken")ifnot next_page_token: logger.info("Retrieved names of all dataset files")break logger.info(f"Number of files to download: {len(filenames)}") worker_count = get_max_worker_count(file_sizes) loop = asyncio.get_event_loop()# Allow up to 10 separate threads to download dataset files concurrently executor = ThreadPoolExecutor(max_workers=worker_count) futures = []# Create tasks that download the dataset filesfor dataset_filename in filenames:# Create future for dataset file future = loop.run_in_executor( executor, download_dataset_file, session, base_url, dataset_name, dataset_version, dataset_filename, download_directory, overwrite, ) futures.append(future)# # Wait for all tasks to complete and gather the results future_results =await asyncio.gather(*futures) logger.info(f"Finished '{dataset_name}' dataset download") failed_downloads =list(filter(lambda x: not x[0], future_results))iflen(failed_downloads) >0: logger.warning("Failed to download the following dataset files:") logger.warning(list(map(lambda x: x[1], failed_downloads)))if__name__=="__main__": asyncio.run(main())
Extensions
If you have other variables at your disposal for the entire grid, that is, including the points you want to interpolate, you could potentially extend the model with covariates:
\[
Z(s) = \sum_{j=0}^p \beta_j X_j (s) + e(s)
\]
In this case, the strategy boils down to modeling the conditional spatial covariance. Similarly, we can potentially extend the covariance structure by allowing spatial covariance to be a function of time:
\[
Z(s, t) = \mu + e(s, t)
\]
This boils down to estimating a separate variogram for each discrete time point \(t\), and solving the kriging system using that particular variogram.
Conclusion
I have attempted to underpin basic geospatial interpolation models and explain the logic on which they’re based. I hope this was useful as a more mathematical underpinning of methods that seem arbitrary at first sight. If you have any recommendations or good texts on this subject that might be interesting, make sure to let me know. Thanks for reading!