Streaming GENIE Model

The StreamingGENIE class extends both GENIE and StreamingBase to provide memory-efficient processing of gene-environment interaction analysis for large-scale genomic data.

Model Types

Similar to the GENIE model, the StreamingGENIE model supports three streaming configurations:

  • G: Basic genetic effects with streaming

  • G+GxE: Genetic and gene-environment interactions with streaming

  • G+GxE+NxE: Full model with noise-environment interactions in streaming mode

Class Inheritance

class StreamingGENIE(GENIE, StreamingBase):
    """Memory-efficient implementation of GENIE."""

Key Methods

pre_compute_jackknife_bin(j, all_gen, worker_num)

Pre-computes statistics for each jackknife sample:

Parameters:
  • j (int) – Jackknife sample index

  • all_gen (list) – List of genotype matrices for each bin

  • worker_num (int) – Worker process identifier

def pre_compute_jackknife_bin(j, all_gen, worker_num):
    # Process genetic effects
    for k, X_kj in enumerate(all_gen):
        X_kj = standardize_geno(X_kj)
        M[j][k] = M[num_jack][k] - X_kj.shape[1]

        # Compute genetic statistics
        for b in range(num_random_vec):
            XXz[k][worker_num][b] += compute_XXz(b, X_kj)

            if use_cov:
                UXXz[k][worker_num][b] += compute_UXXz(XXz[k][worker_num][b])
                XXUz[k][worker_num][b] += compute_XXUz(b, X_kj)

        yXXy[k][worker_num] += compute_yXXy(X_kj, pheno)[0][0]

    # Process GxE interactions if enabled
    if genie_model in ["G+GxE", "G+GxE+NxE"]:
        for e in range(num_env):
            for k, X_kj in enumerate(all_gen):
                X_kj_gxe = elem_mul(X_kj, env[:, e].reshape(-1, 1))
                k_gxe = (e + 1) * k + num_bin

                for b in range(num_random_vec):
                    XXz[k_gxe][worker_num][b] += compute_XXz(b, X_kj_gxe)

                    if use_cov:
                        UXXz[k_gxe][worker_num][b] += compute_UXXz(XXz[k_gxe][worker_num][b])
                        XXUz[k_gxe][worker_num][b] += compute_XXUz(b, X_kj_gxe)

                yXXy[k_gxe][worker_num] += compute_yXXy(X_kj_gxe, pheno)[0][0]
pre_compute_jackknife_bin_pass_2(j, all_gen)

Performs second pass computation for jackknife estimates:

Parameters:
  • j (int) – Jackknife sample index

  • all_gen (list) – List of genotype matrices for each bin

def pre_compute_jackknife_bin_pass_2(j, all_gen):
    # Process genetic effects
    for k, X_kj in enumerate(all_gen):
        X_kj = standardize_geno(X_kj) if j != num_jack else 0
        process_genetic_pass_2(X_kj, k, j)

    # Process GxE interactions
    if genie_model in ["G+GxE", "G+GxE+NxE"]:
        for e in range(num_env):
            for k, X_kj in enumerate(all_gen):
                X_kj_gxe = compute_gxe_interaction(X_kj, e, j)
                process_gxe_pass_2(X_kj_gxe, k, e, j)

Other Methods to Override the Base Class

Similar to the GENIE model which override the base class’s estimate method to return also the adjusted sigma results based on the traces, the StreamingGENIE model overrides the following methods from the BaseStreaming class.

Since the estimate method calls the _estimate_worker method, we need to override the _estimate_worker method as well.

_estimate_worker(self, worker_num, method, start_j, end_j, result_queue, trace_sum):

This method is overridden because for StreamingGENIE, the heritability should be computed with the traces. Thus, the adjusted sigma estimated based on the traces is also returned.

def _estimate_worker(self, worker_num, method, start_j, end_j, result_queue, trace_sum):
    # ... existing code from the streaming base class ...
    sigma_ests_adj = []

    for j in range(self.num_jack):
      # ... existing code from the base class ...
      # Adjust the estimate by the effect of traces for heritability calculation
      sigma_est_adj = []
      for i in range(len(sigma_est)):
          sigma_est_adj.append(sigma_est[i] * T[i, self.num_estimates])

    for i in range(len(sigma_est)):
        sigma_est_adj.append(sigma_est[i] * T[i, self.num_estimates])
    sigma_ests_adj.append(sigma_est_adj)
    # Put the sigma_ests_adj also in the result_queue
    if self.multiprocessing:
        result_queue.put((worker_num, sigma_ests, sigma_ests_adj)) # ensure in order
    else:
        result_queue.extend((sigma_ests, sigma_ests_adj))

def estimate(self, method: str = "lstsq"):
  # ... existing code from the streaming base class ...

  # ... Unpack the results by also including the all_results_adj
      all_results = [item for _, result, _ in results for item in result]
      all_results_adj = [item for _, _, result in results for item in result

  # ... existing code from the streaming base class ...
  # Also aggregate the results from the all_results_adj
  sigma_ests_adj = np.array(all_results_adj)
  sigma_est_jackknife_adj, sigma_ests_total_adj = sigma_ests_adj[:-1, :], sigma_ests_adj[-1, :]

  return sigma_est_jackknife, sigma_ests_total, sigma_est_jackknife_adj, sigma_ests_total_adj

UUsage Example

from pyrhe.models import StreamingGENIE

# Initialize model
streaming_genie_model = StreamingGENIE(
     genie_model="G+GxE+NxE",
    geno_file="path/to/genotype",
    annot_file="path/to/annotation",
    pheno_file="path/to/phenotype",
    cov_file="path/to/covariate",
    num_bins=10,
    num_jack=100,
    num_random_vec=10,
    num_workers=5,
    ...
)

# Run analysis
results = streaming_genie_model()

# Access results
# The outputs are automatically logged in the output file.
# In addition, you can also access the results:
print(results)
print(results['sigma_ests_total'])
# The results are stored in a dictionary. The keys are:
# - sigma_ests_total: Estimated variance components
# - sig_errs: Standard errors of variance components
# - h2_total: Heritability estimates
# - h2_errs: Standard errors of heritability
# - enrichment_total: Enrichment scores
# - enrichment_errs: Standard errors of enrichment