"""
Pre-process ADS-B data and create a pandas DataFrame.
"""
__author__ = "Liam Megill"
__email__ = "liam.megill@dlr.de"
__license__ = "Apache License 2.0"
# imports
import pandas as pd
import openap
[docs]
def create_dataframe(
data: dict,
metadata: dict,
source: str,
) -> pd.DataFrame:
"""
Create a standardised DataFrame from raw ADS-B data and metadata such that
it can be loaded into traffic.
Args:
data (dict): Parsed JSON data, including 'trace'.
metadata (dict): Associated metadata like ICAO, registration, timestamp.
source (str): Source of the ADS-B data (e.g., "adsb_exchange").
Returns:
pd.DataFrame: A standardised, processed DataFrame.
"""
# pre-conditions
if not isinstance(data, dict) or "trace" not in data:
raise ValueError(
"Invalid input: 'data' must be a dict containing a 'trace' key."
)
if not isinstance(metadata, dict):
raise ValueError("Invalid input: 'metadata' must be a dict.")
if source not in ["adsb_exchange"]:
raise ValueError("Unsupported ADS-B source.")
df = pd.DataFrame(data["trace"]) # TODO: this may need to be source-specific
df = standardise_columns(df, source)
df = df.dropna(
subset=["altitude", "groundspeed", "vertical_rate"], ignore_index=True
).copy()
df = add_metadata_columns(df, metadata, source)
df = df.convert_dtypes(dtype_backend="pyarrow") # convert dtype to pyarrow
return df
[docs]
def standardise_columns(df: pd.DataFrame, source: str) -> pd.DataFrame:
"""
Standardise column names and order of a DataFrame in place for internal
consistency.
Args:
df (pd.DataFrame): Raw trace DataFrame.
source (str): Source of the ADS-B data (e.g., "adsb_exchange").
Returns:
pd.DataFrame: DataFrame with consistent column names, modified in place.
Raises:
ValueError: If the source is unknown or unsupported.
"""
if source == "adsb_exchange":
# check input
expected_columns = 14 # adsb_exchange format
if df.shape[1] < expected_columns:
raise ValueError(
f"Expected {expected_columns} columns, got {df.shape[1]}"
)
# identify columns
df.columns = [
"dtime", "latitude", "longitude", "altitude", "groundspeed",
"track", "flags", "vertical_rate", "extra_data", "source",
"col1", "col2", "col3", "col4"
]
df = df[[
"dtime", "latitude", "longitude", "altitude", "groundspeed",
"track", "flags", "vertical_rate"
]].copy()
# update longitude and altitude
df["longitude"] = df["longitude"] % 360.0
with pd.option_context("future.no_silent_downcasting", True):
df["altitude"] = df["altitude"].replace("ground", 0.0).astype(float)
# calculate pressure
df["pressure"] = openap.aero.pressure(df["altitude"] * openap.aero.ft)
else:
raise ValueError(f"Unsupported source: {source}")
return df