Geospatial Interpolation: Some Theory

Author

Bas Machielsen

Published

December 5, 2024

Introduction

Hi all, I want to write about geospatial interpolation methods and the underlying math. Potentially, in a follow-up blog post, I’ll also discuss example, but in this post, I’ll confine myself to some of the theory, introducing concepts like the variogram and the kriging equations: terms which I wasn’t familiar with until very recently. So this post serves as a consolidation of my newly obtained knowledge. It can serve as a background to a text such as this.

What is Spatial Interpolation?

Spatial interpolation is evaluating/prediction a particular value \(Z\) at a point \(s\), denoted \(Z(s)\) on the basis of observed data points \(s_i\). As a motivating example, and potentially the simplest form of spatial interpolation, we can give the following:

\[ Z(s) = \frac{\sum_{i=1}^N w_i Z(s_i)}{\sum_{i=1}^N w_i} \]

with \(w_i = | s - s_i |^{-p}\) and usually, the “inverse distance power” \(p\) equal to two, or potentially with optimized weights using cross validation. We should think of \(s\) as being pixels in a raster data set, for which we have available observable points \(s_i\), which we want to extrapolate to all other points, one such point being denoted as \(s\). This is also the notation that I follow in the remainder of this post: \(s\) is an arbitrary pixel we want to extrapolate for.

Spatial Covariance and the Variogram

The main method (as far as I get) used to interpolate geospatially is not the preceding method, even though it can be used, sometimes with good results. However, the main method starts with a stationary model such as this:

\[ Z(s) = m + e(s) \]

where \(m\) is the mean of the value \(Z\), and \(e(s)\) is a mean-zero error term. One object of interest here is the covariance of the error term \(e(s)\). I referred to the model above as stationary exactly because of the structure imposed on this covariance matrix. In the standard model, it looks like this. For \(n\) observed locations \(s_1, \dots, s_n\), the spatial covariance matrix \(C\) is an \(n \times n\) matrix where each element represents the covariance between \(e(s_i)\) and \(e(s_j)\):

\[ \mathbf{C} = \begin{bmatrix} C(0) & C(h_{12}) & \dots & C(h_{1n}) \\ \newline C(h_{21}) & C(0) & \dots & C(h_{2n}) \\ \newline \vdots & \vdots & \ddots & \vdots \\ \newline C(h_{n1}) & C(h_{n2}) & \dots & C(0) \end{bmatrix} \]

where \(h_{ij} = ||s_i - s_j||\) is the distance between locations \(s_i\) and \(s_j\). Implicit in this matrix, there is a covariance function \(C(h)\), which defines the covariance between two observations squarely as a function of distance between them \(h\).

The covariance function \(C(h)\), with \(h\) being the distance between an arbitrary point \(s\) and point \(s+h\), is defined as:

\[ C(h)=\text{Cov}(e(s),e(s+h))=E[e(s)e(s+h)]−E[e(s)]E[e(s+h)] \]

Since we have imposed that \(E(e(s))=0\), this simplifies to:

\[ C(h)=E[e(s)e(s+h)]. \]

Note also that \(C(0)\) represents the variance of \(e(s)\). The variogram is a relationship that reflects the difference between \(C(h)\) and \(C(0)\). The variogram is defined as:

\[ \gamma (h)=\frac{1}{2} \mathbb{E}[(e(s)−e(s+h))^2] \]

Essentially, the variogram can be thought of as a continuous function in \(h\) that aligns with these sample covariances. The variogram \(\gamma (h)\) and covariance \(\mathbf{C}(h)\) are mathematically related as:

\[ \gamma(h)=C(0)−C(h). \]

To see that, expand the square in the definition of the variogram:

\[ \gamma (h)=\frac{1}{2}(E[e(s)^2]−2E[e(s)e(s+h)]+E[e(s+h)^2]). \]

Under stationarity, the random field e(s)e(s) has:

  • Constant variance:

\[ E[e(s)^2]=E[e(s+h)^2]=C(0), \]

  • A covariance function depending only on the distance \(h\), not on \(s\):

\[ E[e(s)e(s+h)]=C(h). \]

Hence \(\gamma (h) = \frac{1}{2} (C(0) - 2 C(h) + C(0)) = C(0) - C(h)\).

The variogram is a theoretical construct, but can also be estimated, for example, as follows. Let \(h_i\) be a distance interval from \(h_{i0}\) to \(h_{i1}\):

\[ \hat{\gamma}(h) = \frac{1}{2N(h_i)} \sum_{j=1}^{N(h_i)} (z(s_i) - z(s_i + h'))^2 \text{ for } h_{i0} < h' < h_{i1}. \] \(N(h_i)\) is the number of observations (sample pairs) available for distance interval \(h_i\).

Why do we need the variogram? The variogram allows you to estimate the covariance matrix \(C(h)\) in an easy and convenient way, by fitting a curve to the sample variogram. Additionally, it turns out that when spatial interpolating using a method called kriging, the form used in the variogram turns out to be convenient, as we will see.

The Kriging System Setup

We aim to predict \(Z(s)\) as a weighted sum of the observed values \(Z(s_i)\):

\[ Z (s) = \sum_{i=1}^n \lambda_i Z (s_i) \]

while ensuring two things:

  • The prediction is unbiased, meaning \(E[Z(s)]=E[Z(s_i)]\).
  • The prediction minimizes the variance of the error.

The unbiasedness condition requires that the weights sum to 1, \(\sum_{i=1}^N \lambda_i = 1\).

To minimize the prediction error variance, we use the spatial correlation structure encoded by the variogram. For each to be interpolated observation \(Z(s)\), the kriging weights \(\lambda_i\) are chosen such that the variance of the prediction error:

\[ \text{Var}\left(Z(s) - \sum_{i=1}^N \lambda_iZ(s_i)\right) \]

is minimized.

That leads to the so-called kriging system, providing a solution for the weights \(\lambda_i\). I’ll first state this system of equations, then explain where it comes from. For each observed point \(s_i\), the kriging system requires the following relationship to hold between that observed point \(s_i\), the variogram value corresponding to the distance of that observed point with all other observed points, and the distance between the observed point \(s_i\) and the to be interpolated point \(s*\):

\[ \sum_{j=1}^N \lambda_j \gamma (h_{ij}) + \mu = \gamma (h_{i*}) \]

where \(\gamma (h_{ij})\) are the variogram value for the distance between \(s_i\) and \(s_j\), \(\gamma (h_{i*})\) is the variogram value for the distance between \(s_i\) and the predicted location \(s*\), and \(\mu\) is a Lagrange multiplier enforcing the constraint \(\sum_{j=1}^N \lambda_j = 1\). The constraint \(\sum_{j=1}^N \lambda_j = 1\) is the second component of the kriging system. Since this relationship holds for all observed points \(s_i\), we can rewrite this using linear algebra. But mind that this is a separate system for each to be interpolated point \(s*\):

Let \(\Gamma\) be a matrix with the variogram values between all observed points:

\[ \Gamma = \begin{bmatrix} \gamma (h_{11}) & \gamma (h_{12}) & \dots & \gamma (h_{1n}) \\ \newline \gamma (h_{21}) & \gamma (h_{22}) & \dots & \gamma (h_{2n}) \\ \newline \vdots & \vdots & \ddots & \vdots \\ \newline \gamma (h_{n1}) & \gamma (h_{n2}) & \dots & \gamma (h_{nn}) \end{bmatrix} \]

Let \(\gamma_*\) be a column vector of variogram values between the observed points and the to be interpolated location:

\[ \gamma_* = \begin{bmatrix} \gamma (h_{1*}) \\ \newline \gamma (h_{2*}) \\ \newline \vdots \\ \newline \gamma (h_{n*}) \end{bmatrix} \]

Let \(\lambda\) be a column vector of \(n\) weights, and \(v\) be a row vector of \(n\) ones. The kriging system is then expressed as:

\[ \begin{bmatrix} \Gamma & v^T \\ \newline v & 0 \end{bmatrix} \begin{bmatrix} \lambda \\ \newline \mu \end{bmatrix} = \begin{bmatrix} \gamma_* \\ \newline 1 \end{bmatrix} \]

Intuitively, it makes sense that the distance between any point \(s_i\) and a to be interpolated point \(s*\) is a some weighted average of all other distances. Since variograms are a function of distances only, the relationship should also hold for variograms. But a more formal derivation is as follows:

Since we’re minimizing the variance, we have:

\[ \text{Var}(e(s∗))=\text{Var}(Z(s∗)) + \sum_{i=1}^N \sum_{j=1}^N \lambda_i \lambda_j \text{Cov}(Z(s_i),Z(s_j)) - 2 \sum_{i=1}^N \lambda_i \text{Cov}(Z(s∗),Z(s_i)). \]

By our stationarity assumptions, we have:

  • \(\text{Var}(Z(s_∗))=C(0)\),
  • \(\text{Cov}(Z(s_∗),Z(s_i))=C(h_{i∗})\)
  • \(\text{Cov}(Z(s_i),Z(s_j))=C(h_{ij})\)

Hence, the objective function simplifies, and combined with the constraint that \(\sum_{i=1}^N \lambda_i=1\), we have the following Lagrangian:

\[ \mathcal{L}(\lambda, \mu)=C(0)+ \sum_{i=1}^N \sum_{j=1}^N \lambda_i \lambda_j C(h_{ij})−2 \sum_{i=1}^N C(h_{i*})+ \mu (1−\sum_{i=1}^n \lambda_i). \]

Differentiating this with respect to an arbitrary \(\lambda_k\) of our choice variables, and simplifying gives:

\[ \sum_{j=1}^n \lambda_j C(h_{kj})−C(h_{k∗})−\frac{\mu}{2}=0 \]

whereas differentiating with respect to the Lagrange multiplier \(\mu\) just gives us back our constraint. Normalizing the Lagrange multiplier, and writing these \(n\) equations in a system from combined with the constraint gives us the matrix formulation as seen above. Since all of these things are observed, we can simply solve the kriging system for \((\lambda, \mu)\) and find our weights \(\lambda\).

Implementation in R

See below for a code implementation of Kriging in R.

Code
# Source: https://github.com/r-spatial/gstat/pull/108
## FNN local prediction
########################
library(sp)
library(stars)
library(spacetime)
library(gstat)
library(lattice)

# create n space-time points over [0,1] x [0,1] x [Now, Now+some days]
t0 = Sys.time() # now
n = 1000
set.seed(13131) # fix outcomes
x = runif(n)
y = runif(n)
t = t0 + 1e6 * runif(n)
z = rnorm(n)
stidf = STIDF(SpatialPoints(cbind(x,y)), sort(t), data.frame(z=z))

stplot(stidf, number=21, main="random spatio-temporal noise")
sft = st_as_sftime(stidf)
sft # My actual data

# create a regular 20 x 20 x 10 grid of prediction locations:
grd = as(SpatialGrid(GridTopology(c(0.025,0.025), c(.05, .05), c(20,20))), "SpatialPixels")
tgrd = seq(min(t)+10000, max(t)-10000, length.out = 10)

stf = STF(grd, tgrd)
#stf = STFDF(grd, tgrd, data.frame(x=rep(0,400*10)))

st = st_as_stars(stf)

# define a variogram model
sumMetricModel <- vgmST("sumMetric",
                        space=vgm(1/6, "Sph", 0.25, 1/60),
                        time =vgm(2/6, "Exp",  1e5, 1/60),
                        joint=vgm(0.4, "Exp", 0.3, 0.1),
                        stAni=1/1e6)
attr(sumMetricModel, "temporal unit") <- "secs"

locKrig_sft <- gstat::krigeST(z~1, sft, st, sumMetricModel, nmax=20, computeVar = T)

plot(sft) # Actual data
plot(locKrig_sft[1]) # Interpolated and Extrapolated Data

If you want to try it on real data, here are two Python scripts to download German or Dutch historical weather data.

Code
import pandas as pd
import requests
import re

from selenium import webdriver
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Create a Service object with the path to the geckodriver
service = Service(executable_path="./geckodriver")

# Initialize the Firefox WebDriver with the Service object
driver = webdriver.Firefox(service=service)

# Open the webpage
driver.get("https://berkeleyearth.org/temperature-station-list/?phrase=Germany")

# Wait until the table with class "test" is present on the page
WebDriverWait(driver, 10).until(
    EC.presence_of_element_located((By.CLASS_NAME, "table"))
)

# Find the table with class "test"
table = driver.find_element(By.CLASS_NAME, "table")

# Extract table rows
rows = table.find_elements(By.TAG_NAME, "tr")

# Extract headers
headers = [header.text for header in rows[0].find_elements(By.TAG_NAME, "th")]

# Extract table data
data = []
for row in rows[1:]:
    cols = row.find_elements(By.TAG_NAME, "td")
    data.append([col.text for col in cols])

# Create a DataFrame
df = pd.DataFrame(data, columns=headers)

# Append the DataFrame with hyperlinks
hyperlink_els = driver.find_elements(by=By.CSS_SELECTOR, value = "tr td:nth-of-type(1) a")
hyperlinks = [link.get_attribute("href") for link in hyperlink_els]
df["links"] = hyperlinks

driver.quit()

# Now, scrape the data for each of those hyperlinks
def scrape_data(link):
    # Step 1: Read the File Content
    number = re.findall(r'\d+', link)[0]
    url = ''.join(['https://data.berkeleyearth.org/auto/Stations/TAVG/Text/', number, '-TAVG-Data.txt'])
    response = requests.get(url)
    lines = response.text.split('\n')
    if len(lines) < 2:
        return None
    # Step 2: Extract metadata
    metadata = {}
    data_lines = []
    for line in lines:
        if line.startswith('%'):
            if 'Primary Name:' in line:
                metadata['Primary Name'] = line.split(':', 1)[1].strip()
            elif 'Latitude:' in line:
                metadata['Latitude'] = line.split(':', 1)[1].strip()
            elif 'Longitude:' in line:
                metadata['Longitude'] = line.split(':', 1)[1].strip()
            elif 'Year' in line:
                colnames = [i.strip() for i in line.split(sep=',')]
        else:
            # Collect data lines
            if line.strip():
                data_lines.append(line)
        
    # Step 3: Create a data.frame
    df = pd.DataFrame([line.split() for line in data_lines], columns=colnames)
    # Step 4: Add metadata as columns to the DataFrame
    df['Primary name'] = metadata.get('Primary Name', None)
    df['Latitude'] = metadata.get('Latitude', None)
    df['Longitude'] = metadata.get('Longitude', None)
    # Step 5:  Export this to .csv
    title = metadata['Primary Name'].replace(' ', '').replace('/', '').strip().lower()
    df.to_csv(f'./{title}.csv', index=False)
    print(f"Exported {title}")
    return df

df.links.apply(scrape_data)
Code
import asyncio
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple

import requests
from requests import Session

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))


def download_dataset_file(
    session: Session,
    base_url: str,
    dataset_name: str,
    dataset_version: str,
    filename: str,
    directory: str,
    overwrite: bool,
) -> Tuple[bool, str]:
    # if a file from this dataset already exists, skip downloading it.
    file_path = Path(directory, filename).resolve()
    if not overwrite and file_path.exists():
        logger.info(f"Dataset file '{filename}' was already downloaded.")
        return True, filename

    endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files/{filename}/url"
    get_file_response = session.get(endpoint)

    # retrieve download URL for dataset file
    if get_file_response.status_code != 200:
        logger.warning(f"Unable to get file: {filename}")
        logger.warning(get_file_response.content)
        return False, filename

    # use download URL to GET dataset file. We don't need to set the 'Authorization' header,
    # The presigned download URL already has permissions to GET the file contents
    download_url = get_file_response.json().get("temporaryDownloadUrl")
    return download_file_from_temporary_download_url(download_url, directory, filename)


def download_file_from_temporary_download_url(download_url, directory, filename):
    try:
        with requests.get(download_url, stream=True) as r:
            r.raise_for_status()
            with open(f"{directory}/{filename}", "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
    except Exception:
        logger.exception("Unable to download file using download URL")
        return False, filename

    logger.info(f"Downloaded dataset file '{filename}'")
    return True, filename


def list_dataset_files(
    session: Session,
    base_url: str,
    dataset_name: str,
    dataset_version: str,
    params: Dict[str, str],
) -> Tuple[List[str], Dict[str, Any]]:
    logger.info(f"Retrieve dataset files with query params: {params}")

    list_files_endpoint = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files"
    list_files_response = session.get(list_files_endpoint, params=params)

    if list_files_response.status_code != 200:
        raise Exception("Unable to list initial dataset files")

    try:
        list_files_response_json = list_files_response.json()
        dataset_files = list_files_response_json.get("files")
        dataset_filenames = list(map(lambda x: x.get("filename"), dataset_files))
        return dataset_filenames, list_files_response_json
    except Exception as e:
        logger.exception(e)
        raise Exception(e)


def get_max_worker_count(filesizes):
    size_for_threading = 10_000_000  # 10 MB
    average = sum(filesizes) / len(filesizes)
    # to prevent downloading multiple half files in case of a network failure with big files
    if average > size_for_threading:
        threads = 1
    else:
        threads = 10
    return threads


async def main():
    api_key = "eyJvcmciOiI1ZTU1NGUxOTI3NGE5NjAwMDEyYTNlYjEiLCJpZCI6ImE1OGI5NGZmMDY5NDRhZDNhZjFkMDBmNDBmNTQyNjBkIiwiaCI6Im11cm11cjEyOCJ9"
    dataset_name = "antieke_reeksen"
    dataset_version = "v1.0"
    base_url = "https://api.dataplatform.knmi.nl/open-data/v1"
    # When set to True, if a file with the same name exists the output is written over the file.
    # To prevent unnecessary bandwidth usage, leave it set to False.
    overwrite = False

    download_directory = "./data/"

    # Make sure to send the API key with every HTTP request
    session = requests.Session()
    session.headers.update({"Authorization": api_key})

    # Verify that the download directory exists
    if not Path(download_directory).is_dir() or not Path(download_directory).exists():
        raise Exception(f"Invalid or non-existing directory: {download_directory}")

    filenames = []
    max_keys = 500
    next_page_token = None
    file_sizes = []
    # Use the API to get a list of all dataset filenames
    while True:
        # Retrieve dataset files after given filename
        dataset_filenames, response_json = list_dataset_files(
            session,
            base_url,
            dataset_name,
            dataset_version,
            {"maxKeys": f"{max_keys}", "nextPageToken": next_page_token},
        )
        file_sizes.extend(file["size"] for file in response_json.get("files"))
        # Store filenames
        filenames += dataset_filenames

        # If the result is not truncated, we retrieved all filenames
        next_page_token = response_json.get("nextPageToken")
        if not next_page_token:
            logger.info("Retrieved names of all dataset files")
            break

    logger.info(f"Number of files to download: {len(filenames)}")

    worker_count = get_max_worker_count(file_sizes)
    loop = asyncio.get_event_loop()

    # Allow up to 10 separate threads to download dataset files concurrently
    executor = ThreadPoolExecutor(max_workers=worker_count)
    futures = []

    # Create tasks that download the dataset files
    for dataset_filename in filenames:
        # Create future for dataset file
        future = loop.run_in_executor(
            executor,
            download_dataset_file,
            session,
            base_url,
            dataset_name,
            dataset_version,
            dataset_filename,
            download_directory,
            overwrite,
        )
        futures.append(future)

    # # Wait for all tasks to complete and gather the results
    future_results = await asyncio.gather(*futures)
    logger.info(f"Finished '{dataset_name}' dataset download")

    failed_downloads = list(filter(lambda x: not x[0], future_results))

    if len(failed_downloads) > 0:
        logger.warning("Failed to download the following dataset files:")
        logger.warning(list(map(lambda x: x[1], failed_downloads)))


if __name__ == "__main__":
    asyncio.run(main())

Extensions

If you have other variables at your disposal for the entire grid, that is, including the points you want to interpolate, you could potentially extend the model with covariates:

\[ Z(s) = \sum_{j=0}^p \beta_j X_j (s) + e(s) \]

In this case, the strategy boils down to modeling the conditional spatial covariance. Similarly, we can potentially extend the covariance structure by allowing spatial covariance to be a function of time:

\[ Z(s, t) = \mu + e(s, t) \]

This boils down to estimating a separate variogram for each discrete time point \(t\), and solving the kriging system using that particular variogram.

Conclusion

I have attempted to underpin basic geospatial interpolation models and explain the logic on which they’re based. I hope this was useful as a more mathematical underpinning of methods that seem arbitrary at first sight. If you have any recommendations or good texts on this subject that might be interesting, make sure to let me know. Thanks for reading!