Skip to content

💽 Data Pipeline API

The node_fdm.data namespace handles the transformation of raw flight records into training-ready tensors.

Its primary responsibilities include applying architecture-specific preprocessing hooks via the Flight Processor, normalizing features using robust statistics, and managing efficient sequence loading from disk through the Dataset and Loader utilities.


📘 Class Reference

Flight Processor

flight_processor

Flight preprocessing pipeline for converting raw data into model-ready columns.

FlightProcessor

Flexible flight data processor with a customizable post-processing hook.

Source code in src/node_fdm/data/flight_processor.py
class FlightProcessor:
    """Flexible flight data processor with a customizable post-processing hook."""

    def __init__(
        self,
        model_cols: Tuple[Any, Any, Any, Any, Any],
        custom_processing_fn: Optional[Callable[[Any], Any]] = None,
    ) -> None:
        """Initialize the processor with model column configuration and hooks.

        Args:
            model_cols: Tuple of model column groups (state, control, env, etc.).
            custom_processing_fn: Optional callable applied after base processing; uses Any for flexibility with DataFrame-like inputs.
        """
        (
            self.x_cols,
            self.u_cols,
            self.e0_cols,
            self.e_cols,
            self.dx_cols,
        ) = model_cols
        self.dx_cols = [col.derivative for col in self.x_cols]
        self.custom_processing_fn = custom_processing_fn

    # ------------------------------------------------------------------
    def process_flight(self, df: Any) -> DataFrameWrapper:
        """Run the main flight preprocessing pipeline.

        Args:
            df: DataFrame-like object containing raw flight data. Uses Any for flexibility across wrappers.

        Returns:
            Processed DataFrameWrapper filtered to model-relevant columns.
        """

        df = DataFrameWrapper(df)

        for col in Column.get_all():
            raw_col = col.raw_name
            gold_col = col.col_name
            if raw_col is not None and raw_col in df.columns:
                df[gold_col] = col.unit.convert(df[raw_col])

        for col in self.x_cols:
            df[col.derivative] = df[col].diff(1).bfill()

        if self.custom_processing_fn is not None:
            df = self.custom_processing_fn(df)

        return df[self.x_cols + self.u_cols + self.e0_cols + self.e_cols + self.dx_cols]

__init__(model_cols, custom_processing_fn=None)

Initialize the processor with model column configuration and hooks.

Parameters:

Name Type Description Default
model_cols Tuple[Any, Any, Any, Any, Any]

Tuple of model column groups (state, control, env, etc.).

required
custom_processing_fn Optional[Callable[[Any], Any]]

Optional callable applied after base processing; uses Any for flexibility with DataFrame-like inputs.

None
Source code in src/node_fdm/data/flight_processor.py
def __init__(
    self,
    model_cols: Tuple[Any, Any, Any, Any, Any],
    custom_processing_fn: Optional[Callable[[Any], Any]] = None,
) -> None:
    """Initialize the processor with model column configuration and hooks.

    Args:
        model_cols: Tuple of model column groups (state, control, env, etc.).
        custom_processing_fn: Optional callable applied after base processing; uses Any for flexibility with DataFrame-like inputs.
    """
    (
        self.x_cols,
        self.u_cols,
        self.e0_cols,
        self.e_cols,
        self.dx_cols,
    ) = model_cols
    self.dx_cols = [col.derivative for col in self.x_cols]
    self.custom_processing_fn = custom_processing_fn

process_flight(df)

Run the main flight preprocessing pipeline.

Parameters:

Name Type Description Default
df Any

DataFrame-like object containing raw flight data. Uses Any for flexibility across wrappers.

required

Returns:

Type Description
DataFrameWrapper

Processed DataFrameWrapper filtered to model-relevant columns.

Source code in src/node_fdm/data/flight_processor.py
def process_flight(self, df: Any) -> DataFrameWrapper:
    """Run the main flight preprocessing pipeline.

    Args:
        df: DataFrame-like object containing raw flight data. Uses Any for flexibility across wrappers.

    Returns:
        Processed DataFrameWrapper filtered to model-relevant columns.
    """

    df = DataFrameWrapper(df)

    for col in Column.get_all():
        raw_col = col.raw_name
        gold_col = col.col_name
        if raw_col is not None and raw_col in df.columns:
            df[gold_col] = col.unit.convert(df[raw_col])

    for col in self.x_cols:
        df[col.derivative] = df[col].diff(1).bfill()

    if self.custom_processing_fn is not None:
        df = self.custom_processing_fn(df)

    return df[self.x_cols + self.u_cols + self.e0_cols + self.e_cols + self.dx_cols]

Dataset

dataset

Dataset utilities for loading and segmenting flight data sequences.

SeqDataset

Bases: Dataset

Sequence dataset that loads flight segments for model training.

Source code in src/node_fdm/data/dataset.py
class SeqDataset(Dataset):
    """Sequence dataset that loads flight segments for model training."""

    def __init__(
        self,
        flights_path_list: Sequence[str],
        model_cols: Tuple[Any, Any, Any, Any, Any],
        seq_len: int = 60,
        shift: int = 60,
        n_jobs: int = 35,
        load_parallel: bool = True,
        custom_fn: Tuple[
            Optional[Callable[[pd.DataFrame], pd.DataFrame]],
            Optional[Callable[..., bool]],
        ] = (None, None),
    ) -> None:
        """Initialize the dataset with flight paths and model column definitions.

        Args:
            flights_path_list: Iterable of flight parquet file paths.
            model_cols: Tuple containing model column groups (state, control, env, etc.).
            seq_len: Sequence length to extract from each flight.
            shift: Step size when sliding the sequence window.
            n_jobs: Number of parallel workers to use when loading flights.
            load_parallel: Whether to load flights concurrently.
            custom_fn: Tuple of optional processing and segment-filtering callables.
        """
        self.flights_path_list = flights_path_list
        self.shift = shift
        self.seq_len = seq_len
        self.x_cols, self.u_cols, self.e0_cols, self.e_cols, self.dx_cols = model_cols
        self.deriv_cols = [col.derivative for col in self.x_cols]
        self.model_cols = model_cols
        self.load_parallel = load_parallel
        self.n_jobs = n_jobs
        custom_processing_fn, custom_segment_filtering_fn = custom_fn
        self.processor = FlightProcessor(
            model_cols, custom_processing_fn=custom_processing_fn
        )
        self.custom_segment_filtering_fn = custom_segment_filtering_fn
        self.init_flight_date()

    def init_flight_date(self) -> None:
        """Load all flights, build sequence cache, and compute aggregate statistics.

        Populates internal sequence list and per-column statistics used for normalization.
        """
        if self.load_parallel:
            results = Parallel(n_jobs=self.n_jobs)(
                delayed(self.process_one_flight)(
                    flight,
                )
                for flight in tqdm(self.flights_path_list, desc="Loading flights")
            )
        else:
            results = [
                self.process_one_flight(flight)
                for flight in tqdm(self.flights_path_list, desc="Loading flights")
            ]

        self.sequences = []
        for seqs in results:
            self.sequences.extend(seqs)

        all_data = np.concatenate(
            [
                np.concatenate([seq[0], seq[1], seq[2], seq[3]], axis=1)
                for seq in self.sequences
            ],
            axis=0,
        )

        all_cols = (
            self.x_cols + self.u_cols + self.e0_cols + self.e_cols + self.deriv_cols
        )

        self.stats_dict = dict()

        for i, col in enumerate(all_cols):
            vals = all_data[:, i].astype(float)
            self.stats_dict[col] = {
                "mean": vals.mean(),
                "std": vals.std() + 1e-6,
                "max": np.percentile(np.abs(vals), 99.5),
            }

    def process_one_flight(
        self, flight_path: str
    ) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]:
        """Process a single flight file into clean, nan-free sequences.

        Args:
            flight_path: Path to a flight parquet file.

        Returns:
            List of tuples containing state, control, environment, and derivative arrays.
        """
        f = self.read_flight(flight_path)
        seqs = []
        N = len(f)
        if N > self.seq_len:
            x_seq = f[self.x_cols].values.astype(np.float32)
            u_seq = f[self.u_cols].values.astype(np.float32)
            e_seq = f[self.e0_cols + self.e_cols].values.astype(np.float32)
            dx_seq = f[self.deriv_cols].values.astype(np.float32)

            for start in range(0, N - self.seq_len + 1, self.shift):
                custom_segment_filtering_bool = True
                if self.custom_segment_filtering_fn is not None:
                    custom_segment_filtering_bool = self.custom_segment_filtering_fn(
                        f, start, self.seq_len
                    )
                nans = sum(
                    [
                        np.isnan(seq[start : start + self.seq_len]).sum()
                        for seq in [x_seq, u_seq, e_seq, dx_seq]
                    ]
                )
                if (custom_segment_filtering_bool) & (nans == 0):
                    seqs.append(
                        (
                            x_seq[start : start + self.seq_len],
                            u_seq[start : start + self.seq_len],
                            e_seq[start : start + self.seq_len],
                            dx_seq[start : start + self.seq_len],
                        )
                    )
        return seqs

    def __len__(self) -> int:
        """Return number of available sequences.

        Returns:
            Count of cached flight sequences.
        """
        return len(self.sequences)

    def __getitem__(
        self, idx: int
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """Return tensors for a specific sequence index.

        Args:
            idx: Index of the sequence to retrieve.

        Returns:
            Tuple of tensors for state, control, environment, and derivative slices.
        """
        x_seq, u_seq, e_seq, dxdt_seq = self.sequences[idx]
        return (
            torch.tensor(x_seq, dtype=torch.float32),
            torch.tensor(u_seq, dtype=torch.float32),
            torch.tensor(e_seq, dtype=torch.float32),
            torch.tensor(dxdt_seq, dtype=torch.float32),
        )

    def read_flight(self, flight_path: str) -> pd.DataFrame:
        """Read a flight parquet file and apply base processing.

        Args:
            flight_path: Path to a parquet file containing flight data.

        Returns:
            Processed DataFrame with standardized columns.
        """
        f = pd.read_parquet(flight_path)
        return self.processor.process_flight(f)

    def get_full_flight(
        self, flight_idx: int
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, pd.DataFrame]:
        """Return full arrays for a specific flight index.

        Args:
            flight_idx: Index of the flight in the provided flight list.

        Returns:
            Tuple of state, control, environment, derivative arrays, and the full DataFrame.
        """
        flight_path = self.flights_path_list[flight_idx]
        f = self.read_flight(flight_path)
        x_seq = f[self.x_cols].values.astype(np.float32)
        u_seq = f[self.u_cols].values.astype(np.float32)
        e0_seq = f[self.e0_cols + self.e_cols].values.astype(np.float32)
        dx_seq = f[self.deriv_cols].values.astype(np.float32)
        return x_seq, u_seq, e0_seq, dx_seq, f

__getitem__(idx)

Return tensors for a specific sequence index.

Parameters:

Name Type Description Default
idx int

Index of the sequence to retrieve.

required

Returns:

Type Description
Tuple[Tensor, Tensor, Tensor, Tensor]

Tuple of tensors for state, control, environment, and derivative slices.

Source code in src/node_fdm/data/dataset.py
def __getitem__(
    self, idx: int
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
    """Return tensors for a specific sequence index.

    Args:
        idx: Index of the sequence to retrieve.

    Returns:
        Tuple of tensors for state, control, environment, and derivative slices.
    """
    x_seq, u_seq, e_seq, dxdt_seq = self.sequences[idx]
    return (
        torch.tensor(x_seq, dtype=torch.float32),
        torch.tensor(u_seq, dtype=torch.float32),
        torch.tensor(e_seq, dtype=torch.float32),
        torch.tensor(dxdt_seq, dtype=torch.float32),
    )

__init__(flights_path_list, model_cols, seq_len=60, shift=60, n_jobs=35, load_parallel=True, custom_fn=(None, None))

Initialize the dataset with flight paths and model column definitions.

Parameters:

Name Type Description Default
flights_path_list Sequence[str]

Iterable of flight parquet file paths.

required
model_cols Tuple[Any, Any, Any, Any, Any]

Tuple containing model column groups (state, control, env, etc.).

required
seq_len int

Sequence length to extract from each flight.

60
shift int

Step size when sliding the sequence window.

60
n_jobs int

Number of parallel workers to use when loading flights.

35
load_parallel bool

Whether to load flights concurrently.

True
custom_fn Tuple[Optional[Callable[[DataFrame], DataFrame]], Optional[Callable[..., bool]]]

Tuple of optional processing and segment-filtering callables.

(None, None)
Source code in src/node_fdm/data/dataset.py
def __init__(
    self,
    flights_path_list: Sequence[str],
    model_cols: Tuple[Any, Any, Any, Any, Any],
    seq_len: int = 60,
    shift: int = 60,
    n_jobs: int = 35,
    load_parallel: bool = True,
    custom_fn: Tuple[
        Optional[Callable[[pd.DataFrame], pd.DataFrame]],
        Optional[Callable[..., bool]],
    ] = (None, None),
) -> None:
    """Initialize the dataset with flight paths and model column definitions.

    Args:
        flights_path_list: Iterable of flight parquet file paths.
        model_cols: Tuple containing model column groups (state, control, env, etc.).
        seq_len: Sequence length to extract from each flight.
        shift: Step size when sliding the sequence window.
        n_jobs: Number of parallel workers to use when loading flights.
        load_parallel: Whether to load flights concurrently.
        custom_fn: Tuple of optional processing and segment-filtering callables.
    """
    self.flights_path_list = flights_path_list
    self.shift = shift
    self.seq_len = seq_len
    self.x_cols, self.u_cols, self.e0_cols, self.e_cols, self.dx_cols = model_cols
    self.deriv_cols = [col.derivative for col in self.x_cols]
    self.model_cols = model_cols
    self.load_parallel = load_parallel
    self.n_jobs = n_jobs
    custom_processing_fn, custom_segment_filtering_fn = custom_fn
    self.processor = FlightProcessor(
        model_cols, custom_processing_fn=custom_processing_fn
    )
    self.custom_segment_filtering_fn = custom_segment_filtering_fn
    self.init_flight_date()

__len__()

Return number of available sequences.

Returns:

Type Description
int

Count of cached flight sequences.

Source code in src/node_fdm/data/dataset.py
def __len__(self) -> int:
    """Return number of available sequences.

    Returns:
        Count of cached flight sequences.
    """
    return len(self.sequences)

get_full_flight(flight_idx)

Return full arrays for a specific flight index.

Parameters:

Name Type Description Default
flight_idx int

Index of the flight in the provided flight list.

required

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray, ndarray, DataFrame]

Tuple of state, control, environment, derivative arrays, and the full DataFrame.

Source code in src/node_fdm/data/dataset.py
def get_full_flight(
    self, flight_idx: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, pd.DataFrame]:
    """Return full arrays for a specific flight index.

    Args:
        flight_idx: Index of the flight in the provided flight list.

    Returns:
        Tuple of state, control, environment, derivative arrays, and the full DataFrame.
    """
    flight_path = self.flights_path_list[flight_idx]
    f = self.read_flight(flight_path)
    x_seq = f[self.x_cols].values.astype(np.float32)
    u_seq = f[self.u_cols].values.astype(np.float32)
    e0_seq = f[self.e0_cols + self.e_cols].values.astype(np.float32)
    dx_seq = f[self.deriv_cols].values.astype(np.float32)
    return x_seq, u_seq, e0_seq, dx_seq, f

init_flight_date()

Load all flights, build sequence cache, and compute aggregate statistics.

Populates internal sequence list and per-column statistics used for normalization.

Source code in src/node_fdm/data/dataset.py
def init_flight_date(self) -> None:
    """Load all flights, build sequence cache, and compute aggregate statistics.

    Populates internal sequence list and per-column statistics used for normalization.
    """
    if self.load_parallel:
        results = Parallel(n_jobs=self.n_jobs)(
            delayed(self.process_one_flight)(
                flight,
            )
            for flight in tqdm(self.flights_path_list, desc="Loading flights")
        )
    else:
        results = [
            self.process_one_flight(flight)
            for flight in tqdm(self.flights_path_list, desc="Loading flights")
        ]

    self.sequences = []
    for seqs in results:
        self.sequences.extend(seqs)

    all_data = np.concatenate(
        [
            np.concatenate([seq[0], seq[1], seq[2], seq[3]], axis=1)
            for seq in self.sequences
        ],
        axis=0,
    )

    all_cols = (
        self.x_cols + self.u_cols + self.e0_cols + self.e_cols + self.deriv_cols
    )

    self.stats_dict = dict()

    for i, col in enumerate(all_cols):
        vals = all_data[:, i].astype(float)
        self.stats_dict[col] = {
            "mean": vals.mean(),
            "std": vals.std() + 1e-6,
            "max": np.percentile(np.abs(vals), 99.5),
        }

process_one_flight(flight_path)

Process a single flight file into clean, nan-free sequences.

Parameters:

Name Type Description Default
flight_path str

Path to a flight parquet file.

required

Returns:

Type Description
List[Tuple[ndarray, ndarray, ndarray, ndarray]]

List of tuples containing state, control, environment, and derivative arrays.

Source code in src/node_fdm/data/dataset.py
def process_one_flight(
    self, flight_path: str
) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]:
    """Process a single flight file into clean, nan-free sequences.

    Args:
        flight_path: Path to a flight parquet file.

    Returns:
        List of tuples containing state, control, environment, and derivative arrays.
    """
    f = self.read_flight(flight_path)
    seqs = []
    N = len(f)
    if N > self.seq_len:
        x_seq = f[self.x_cols].values.astype(np.float32)
        u_seq = f[self.u_cols].values.astype(np.float32)
        e_seq = f[self.e0_cols + self.e_cols].values.astype(np.float32)
        dx_seq = f[self.deriv_cols].values.astype(np.float32)

        for start in range(0, N - self.seq_len + 1, self.shift):
            custom_segment_filtering_bool = True
            if self.custom_segment_filtering_fn is not None:
                custom_segment_filtering_bool = self.custom_segment_filtering_fn(
                    f, start, self.seq_len
                )
            nans = sum(
                [
                    np.isnan(seq[start : start + self.seq_len]).sum()
                    for seq in [x_seq, u_seq, e_seq, dx_seq]
                ]
            )
            if (custom_segment_filtering_bool) & (nans == 0):
                seqs.append(
                    (
                        x_seq[start : start + self.seq_len],
                        u_seq[start : start + self.seq_len],
                        e_seq[start : start + self.seq_len],
                        dx_seq[start : start + self.seq_len],
                    )
                )
    return seqs

read_flight(flight_path)

Read a flight parquet file and apply base processing.

Parameters:

Name Type Description Default
flight_path str

Path to a parquet file containing flight data.

required

Returns:

Type Description
DataFrame

Processed DataFrame with standardized columns.

Source code in src/node_fdm/data/dataset.py
def read_flight(self, flight_path: str) -> pd.DataFrame:
    """Read a flight parquet file and apply base processing.

    Args:
        flight_path: Path to a parquet file containing flight data.

    Returns:
        Processed DataFrame with standardized columns.
    """
    f = pd.read_parquet(flight_path)
    return self.processor.process_flight(f)

Loader

loader

Helper for building train/validation datasets.

get_train_val_data(data_df, model_cols, shift=60, seq_len=60, custom_fn=(None, None), load_parallel=True, train_val_num=(5000, 500))

Create training and validation datasets from a labeled file list.

Parameters:

Name Type Description Default
data_df DataFrame

DataFrame containing file paths with a split column.

required
model_cols

Tuple containing model column groups.

required
shift int

Window shift used when generating sequences.

60
seq_len int

Sequence length for each sample.

60
custom_fn Tuple[Optional[Callable[[DataFrame], DataFrame]], Optional[Callable[..., bool]]]

Tuple of optional processing and segment-filtering callables.

(None, None)
load_parallel bool

Whether to load flights concurrently.

True
train_val_num Tuple[int, int]

Maximum number of train and validation files to load.

(5000, 500)

Returns:

Type Description
Tuple[SeqDataset, SeqDataset]

Tuple of training and validation SeqDataset instances.

Source code in src/node_fdm/data/loader.py
def get_train_val_data(
    data_df: pd.DataFrame,
    model_cols,
    shift: int = 60,
    seq_len: int = 60,
    custom_fn: Tuple[
        Optional[Callable[[pd.DataFrame], pd.DataFrame]], Optional[Callable[..., bool]]
    ] = (None, None),
    load_parallel: bool = True,
    train_val_num: Tuple[int, int] = (5000, 500),
) -> Tuple[SeqDataset, SeqDataset]:
    """Create training and validation datasets from a labeled file list.

    Args:
        data_df: DataFrame containing file paths with a `split` column.
        model_cols: Tuple containing model column groups.
        shift: Window shift used when generating sequences.
        seq_len: Sequence length for each sample.
        custom_fn: Tuple of optional processing and segment-filtering callables.
        load_parallel: Whether to load flights concurrently.
        train_val_num: Maximum number of train and validation files to load.

    Returns:
        Tuple of training and validation SeqDataset instances.
    """

    train_files = data_df[data_df.split == "train"].filepath.tolist()
    validation_files = data_df[data_df.split == "val"].filepath.tolist()

    train_dataset = SeqDataset(
        train_files[: train_val_num[0]],
        model_cols,
        seq_len=seq_len,
        shift=shift,
        custom_fn=custom_fn,
        load_parallel=load_parallel,
    )
    val_dataset = SeqDataset(
        validation_files[: train_val_num[1]],
        model_cols,
        seq_len=seq_len,
        shift=shift,
        custom_fn=custom_fn,
        load_parallel=load_parallel,
    )
    return train_dataset, val_dataset