Source code for y0.algorithm.tian_id

"""An implementation of Tian and Pearl's identification algorithm from [tian03a]_.

.. [tikka20a] https://github.com/santikka/causaleffect/blob/master/R/compute.c.factor.R.

.. [tikka20b] https://github.com/santikka/causaleffect/blob/master/R/identify.R.

.. [tian03a] Tian J, Pearl J (2003). `On the Identification of Causal Effects
    <https://ftp.cs.ucla.edu/pub/stat_ser/R290-L.pdf>`_. Technical report, Department of
    Computer Science, University of California, Los Angeles. R-290-L
"""

import logging
from collections.abc import Collection, Sequence

from y0.dsl import (
    Distribution,
    Expression,
    Fraction,
    One,
    P,
    PopulationProbability,
    Probability,
    Product,
    Sum,
    Variable,
)
from y0.graph import NxMixedGraph

__all__ = [
    "compute_ancestral_set_q_value",
    "compute_c_factor",
    "compute_c_factor_conditioning_on_topological_predecessors",
    "compute_c_factor_marginalizing_over_topological_successors",
    "compute_q_value_of_variables_with_low_topological_ordering_indices",
    "identify_district_variables",
]

logger = logging.getLogger(__name__)


[docs] def identify_district_variables( # noqa:C901 *, input_variables: set[Variable], input_district: set[Variable], district_probability: Expression, graph: NxMixedGraph, ordering: Sequence[Variable], ) -> Expression | None: """Implement the IDENTIFY algorithm as presented in [tian03a]_ with pseudocode in [correa22a]_ (Algorithm 5). Tikka and colleagues implemented this algorithm in the R package Causal Effect ([tikka20b]_). We draw from that implementation. Their version also keeps track of the structure of calls, while this one does not. :param input_variables: The set of variables, C, for which we're checking if causal identification is possible. :param input_district: The C-component, T, containing C. :param district_probability: The expression $Q[T]$ as per [tian03a]_, Equation 34. Because T is a single district, $Q[T]$ is the "post-intervention distribution of the variables in T, under an intervention that sets all other variables to constants" (see Equation 36 of [tian03a]_). :param graph: The relevant graph. :param ordering: A list of variables in topological order that includes all variables in the graph and may contain more. :returns: An expression for $Q[C]$ in terms of $Q$, or Fail. :raises KeyError: at least one input variable is not in the input district or at least one input district variable is not in the topologically sorted list of graph variables. :raises TypeError: the subgraph of the input graph G comprised of the vertices in the input vertex set T should have only district and has more. :raises NotImplementedError: If we get to the end of the conditional, which still needs an "else" """ if input_variables.intersection(input_district) != input_variables: # if not all(v in input_district for v in input_variables): raise KeyError( "In identify_district_variables: at least one of the input variables C is not in the input district T." ) if input_district.intersection(ordering) != input_district: raise KeyError( "In identify_district_variables: at least one input district variable is not in the " "topologically sorted variable list." ) district_subgraph = graph.subgraph(input_district) # $G_{T}$ if len(district_subgraph.districts()) > 1: raise TypeError( "In identify_district_variables: the subgraph of the input graph G comprised of the" " vertices in the input vertex set T should have only district and has more." ) if (not isinstance(district_probability, Expression)) | isinstance( district_probability, Expression ) and not (isinstance(district_probability, Sum | Product | Fraction | Probability)): raise TypeError( "In identify_district_variables: the district probability must be an expression that is a " "Sum, Product, Fraction, or Probability." ) # A = Ancestors of C in $G_{T}$ ancestral_set = district_subgraph.ancestors_inclusive(input_variables) # Next, Tikka has an additional line intersecting the ancestral set with the set T in case any C was not in T, # but we raise an error in that case as a pre-processing step, so we omit that line. ordered_ancestral_set = [a for a in ordering if a in ancestral_set] if ancestral_set == input_variables: logger.debug("In identify_district_variables: A = C. Applying Lemma 3.") logger.debug(" Subgraph_probability = " + district_probability.to_latex()) return compute_ancestral_set_q_value( ancestral_set=ancestral_set, subgraph_variables=input_district, subgraph_probability=district_probability, ordering=ordering, ) elif ancestral_set == input_district: logger.debug("In identify_district_variables: A = T. Returning None (i.e., FAIL).") logger.debug(" A = " + str(input_district)) logger.debug(" T = " + str(ancestral_set)) return None elif input_variables.issubset(ancestral_set) and ancestral_set.issubset(input_district): ancestral_set_subgraph = graph.subgraph(ordered_ancestral_set) ancestral_set_subgraph_districts = list(ancestral_set_subgraph.districts()) targeted_ancestral_set_subgraph_district = set( ancestral_set_subgraph_districts[ [ input_variables.issubset(district) for district in ancestral_set_subgraph_districts ].index(True) ] ) # t_prime = [district for district in ancestral_set_subgraph_districts if # input_variables.intersect(district)==input_variables][0] # ordered_t_prime_vertices = [v for v in topo if v in t_prime] # t_one = t_prime.intersection(ancestral_set) # RC: This line is in Tikka if isinstance(district_probability, Fraction | Product | Sum): # Compute Q[A] from Lemma 3 ancestral_set_probability = compute_ancestral_set_q_value( ancestral_set=ancestral_set, subgraph_variables=input_district, subgraph_probability=district_probability, # Q[T] ordering=ordering, ) elif isinstance(district_probability, Probability): distribution = ( ordered_ancestral_set[0].joint(ordered_ancestral_set[1:]) | district_probability.parents ) if isinstance(district_probability, PopulationProbability): ancestral_set_probability = PopulationProbability( population=district_probability.population, distribution=distribution, ) else: ancestral_set_probability = P(distribution) logger.debug( "Got ancestral_set_probability. Result = " + ancestral_set_probability.to_latex() ) else: raise TypeError( "In identify_district_variables: the district probability is an expression of an unknown type." ) # Get Q[T'] by Lemma 4 or Lemma 1 logger.debug( "In identify_district_variables: about to call _compute_c_factor. Subgraph_probability = " + ancestral_set_probability.to_latex() ) targeted_ancestral_set_subgraph_district_probability = compute_c_factor( district=targeted_ancestral_set_subgraph_district, subgraph_variables=ancestral_set, subgraph_probability=ancestral_set_probability, ordering=ordering, ) logger.debug( "In identify_district_variables: about to recursively call identify_district_variables." ) logger.debug(" C = " + str(input_variables)) logger.debug(" T' = " + str(targeted_ancestral_set_subgraph_district)) logger.debug(" Q[T'] =" + str(targeted_ancestral_set_subgraph_district_probability)) logger.debug(" graph nodes = " + str(list(graph.nodes()))) logger.debug(" topo = " + str(ordering)) return identify_district_variables( input_variables=input_variables, input_district=targeted_ancestral_set_subgraph_district, district_probability=targeted_ancestral_set_subgraph_district_probability, graph=graph, ordering=ordering, ) else: raise NotImplementedError
[docs] def compute_c_factor_conditioning_on_topological_predecessors( *, district: Collection[Variable], graph_probability: Probability, ordering: Sequence[Variable], ) -> Expression: r"""Compute the Q value associated with the C-component (district) in a graph as per [tian03a]_, Equation 37. This algorithm uses part (i) of Lemma 1 of [tian03a]_. :param district: A list of variables comprising the district for which we're computing a C factor. :param graph_probability: the Q value for the full graph. :param ordering: a topological sort of the vertices in the graph. :returns: An expression for Q[district]. :raises TypeError: the district or variable set from which it is drawn contained no variables. :raises KeyError: a variable in the district is not in the topological sort of the graph vertices. :math: Let a topological order over $V$ be $V_{1} < \ldots < V_{n}$, and let $V^{(i)}=\{V_{1},\ldots,V_{i}\}$, $i = 1,\ldots,n$, and $V^{(0)} = \emptyset$. For any set $C$, let $G_{C}$ denote the subgraph of $G$ composed only of variables in $C$. Then each c-factor $Q_{j}$, $j=1,\ldots,k$, is identifiable and is given by \begin{equation} $Q_{j} = \prod_\limit{\{i|V_{i}\in S_{j}\}}{P(v_i}|v^{(i-1}))}$. \end{equation} """ # (Topological sort is O(V+E): https://stackoverflow.com/questions/31010922/) variables = set(ordering) logger.debug( "In _compute_c_factor_conditioning_on_topological_predecessors: topo = " + str(ordering) ) logger.debug( "In _compute_c_factor_conditioning_on_topological_predecessors: graph_probability = " + graph_probability.to_latex() ) if len(district) == 0 or len(variables) == 0: raise TypeError( "Error in _compute_c_factor_conditioning_on_topological_predecessors: the district or variable " + "set from which it is drawn contained no variables." ) if any(v not in variables for v in district): raise KeyError( "Error in _compute_c_factor_conditioning_on_topological_predecessors: a variable in the district" + " is not in the topological sort of the graph vertices." ) if isinstance(graph_probability, PopulationProbability): population_probabilities = [] # A little subtle so it deserves a comment: the Q value passed into Tian's Identify function may # already be conditioned on some variables that are in G but not in the subgraph H. In applying Lemma 1 # (but not Lemma 4), we have to make sure we're also conditioning on those variables. graph_probability_parents = set(graph_probability.parents) for variable in district: preceding_variables = ordering[: ordering.index(variable)] conditioned_variables = graph_probability_parents.union(preceding_variables) # V^(i-1) pp = PopulationProbability( population=graph_probability.population, distribution=Distribution( children=frozenset([variable]), parents=frozenset(conditioned_variables) ), ) population_probabilities.append(pp) logger.debug( "In _compute_c_factor_conditioning_on_topological_predecessors: returning " + str(Product.safe(population_probabilities)) ) logger.debug( "Return value in Latex form is " + Product.safe(population_probabilities).to_latex() ) return Product.safe(population_probabilities) else: probabilities = [] # A little subtle so it deserves a comment: the Q value passed into Tian's Identify function may # already be conditioned on some variables that are in G but not in the subgraph H. In applying Lemma 1 # (but not Lemma 4), we have to make sure we're also conditioning on those variables. graph_probability_parents = set(graph_probability.parents) for variable in district: preceding_variables = ordering[: ordering.index(variable)] conditioned_variables = graph_probability_parents.union(preceding_variables) # V^(i-1) probability = P(variable | conditioned_variables) # v_i probabilities.append(probability) logger.debug( "In _compute_c_factor_conditioning_on_topological_predecessors: returning " + str(Product.safe(probabilities)) ) logger.debug("Return value in Latex form is " + Product.safe(probabilities).to_latex()) return Product.safe(probabilities)
[docs] def compute_q_value_of_variables_with_low_topological_ordering_indices( *, node: Variable | None, expression: Expression, # Q[H] ordering: Sequence[Variable], ) -> Expression: r"""Compute the Q value of a set of variables according to [tian03a]_, Equation 72. :math: Let $H \subseteq V$, and assume that $H$ is partitioned into c-components $H_{1}, \dots, V_{h_{l}}$ in the subgraph $G_{H}$. Then we have... (ii) Let $k$ be the number of variables in $H$, and let a topological order of the variables in $H$ be $V_{h_{1}} < \cdots < V_{h_{k}}$ be the set of variables in $G_{H}$. Let $H^{(i)} = {V_{h_{1}},\dots,V_{h_{i}}$ be the set of variables in $H$ ordered before $V_{h_{i}}$ (including $V_{h_{i}}$), $i=1,\dots,k$, and $H^{(0)} = \emptyset$. Then each $Q[H_{j}]$, $j = 1,\dots,l$, is computable from $Q[H]$ and is given by \begin{equation} $Q[H_{j} = \prod_{\{i|V_{h_{i}}\in H_{j}\}}{\frac{Q[H^{(i)}]}{Q[H^{(i-1)}]},$ \end{equation} where each $Q[H^{(i)}], i = 0, 1, \dots\, k$, is given by \begin{equation} $Q[H^{(i)}] = \sum_{h \backslash h^{(i)}}{Q[H]}$. \end{equation} (The second equation above is Equation 72.) :param node: The $i^{th}$ variable in topological order :param expression: The probability of $H$ corresponding to $Q[H]$ in Equation 72. :param ordering: a topological sorting of the subgraph under analysis, $G_{H}$. :returns: An expression for $Q[H^{(i)}]$. :raises KeyError: the input vertex is not in the variable set or not in the topological ordering of graph vertices. """ # $Q[H^{(0)}] = Q[\emptyset] = 1$ if node is None: return One() logger.debug( "In _compute_q_value_of_variables_with_low_topological_ordering_indices: input vertex is " + str(node) ) logger.debug(" and ordering is %s", ordering) if node not in ordering: raise KeyError( "In _compute_q_value_of_variables_with_low_topological_ordering_indices: input vertex " + "%s is not in the input graph.", node, ) ranges = ordering[ordering.index(node) + 1 :] return Sum.safe(expression, ranges)
[docs] def compute_c_factor_marginalizing_over_topological_successors( *, district: Collection[Variable], graph_probability: Expression, ordering: Sequence[Variable] ) -> Expression: r"""Compute the Q value associated with the C-component (district) in a graph as per [tian03a]_, eqns. 71 and 72. This algorithm uses part (ii) of Lemma 4 of [tian03a]_. The context for Equations 71 and 72 follow: :param district: A list of variables comprising the district for which we're computing a C factor. :param graph_probability: The expression $Q$ corresponding to the set of variables in $v$. It is $Q[A]$ on the line calling Lemma 4 in [tian03a]_, Figure 7. :param ordering: a topological ordering of the vertices in the subgraph $G_{H}$ in question. :returns: An expression for Q[district]. :math: Let $H \subseteq V$, and assume that $H$ is partitioned into c-components $H_{1}, \dots, V_{h_{l}}$ in the subgraph $G_{H}$. Then we have... (ii) Let $k$ be the number of variables in $H$, and let a topological order of the variables in $H$ be $V_{h_{1}} < \cdots < V_{h_{k}}$ be the set of variables in $G_{H}$. Let $H^{(i)} = {V_{h_{1}},\dots,V_{h_{i}}$ be the set of variables in $H$ ordered before $V_{h_{i}}$ (including $V_{h_{i}}$), $i=1,\dots,k$, and $H^{(0)} = \emptyset$. Then each $Q[H_{j}]$, $j = 1,\dots,l$, is computable from $Q[H]$ and is given by \begin{equation} $Q[H_{j} = \prod_{\{i|V_{h_{i}}\in H_{j}\}}{\frac{Q[H^{(i)}]}{Q[H^{(i-1)}]},$ \end{equation} where each $Q[H^{(i)}], i = 0, 1, \dots\, k$, is given by \begin{equation} $Q[H^{(i)}] = \sum_{h \backslash h^{(i)}}{Q[H]}$. \end{equation} """ def _get_expression_from_index(index: int) -> Expression: # Compute $Q[H^{i}]$ given i current_index_expr = compute_q_value_of_variables_with_low_topological_ordering_indices( node=ordering[index], expression=graph_probability, ordering=ordering ) if index == 0: return current_index_expr previous_index_expr = compute_q_value_of_variables_with_low_topological_ordering_indices( node=ordering[index - 1], expression=graph_probability, ordering=ordering ) return Fraction(current_index_expr, previous_index_expr) rv = Product.safe(_get_expression_from_index(ordering.index(vertex)) for vertex in district) # TODO simplify this product by cancelling terms in the numerator and denominator. return rv
[docs] def compute_c_factor( *, district: Collection[Variable], subgraph_variables: Collection[Variable], subgraph_probability: Expression, ordering: Sequence[Variable], ) -> Expression: """Compute the Q value associated with the C-component (district) in a graph as per [tian03a]_ and [tikka20a]_. This algorithm uses both Lemma 1, part (i) of Tian03a (Equation 37) and Lemma 4 of Tian 03a (Equations 71 and 72). :param district: A list of variables comprising the district C for which we're computing a C factor. :param subgraph_variables: The variables in the subgraph T under analysis. :param subgraph_probability: The expression Q corresponding to the set of variables in T. As an example, this quantity would be Q[A] on the line calling Lemma 4 in [tian03a]_, Figure 7. :param ordering: A list of variables in topological order that includes all variables in G, where T is contained in G. :returns: An expression for Q[district]. :raises TypeError: In _compute_c_factor: expected the subgraph_probability parameter to be a simple probability. """ # The graph_topo is the ordering of vertices in G, but the lemmas use the topological sorting in a subgraph H of G. # We take in the topological ordering of G to make testing easier, as there could be multiple ways to # sort the vertices in H topologically. It is also faster as topological sort is O(V+E) and getting # subgraph_topo below is O(V). subgraph_ordering = [v for v in ordering if v in subgraph_variables] if isinstance(subgraph_probability, Fraction | Product | Sum): # Lemma 4 return compute_c_factor_marginalizing_over_topological_successors( district=district, graph_probability=subgraph_probability, ordering=subgraph_ordering ) elif isinstance(subgraph_probability, Probability): # Lemma 1 return compute_c_factor_conditioning_on_topological_predecessors( district=district, graph_probability=subgraph_probability, ordering=subgraph_ordering ) else: raise TypeError( f"Expected fraction, product, sum, or probability. got: {subgraph_probability}" )
[docs] def compute_ancestral_set_q_value( *, ancestral_set: set[Variable], # A subgraph_variables: set[Variable], # T subgraph_probability: Expression, ordering: Sequence[Variable], # topological ordering of variables in a graph containing T ) -> Expression: r"""Compute the Q value associated with a subgraph as per Equation 69 of [tian03a]_. This algorithm uses Lemma 3 of [tian03a]_ (Equation 69). :param ancestral_set: A set of variables (W in Equation 69, A in Figure 7 of [tian03a]_) that comprise the ancestral set of some other variables (unspecified in Equation 69, and C in Figure 7 of [tian03a]_). :param subgraph_variables: The variables in the subgraph under analysis (C in Equation 69, and T in Figure 7). :param subgraph_probability: The expression Q corresponding to Q[C] in Equation 69 and Q[T] in Figure 7. :param ordering: A list of variables in topological order that includes all variables in the graph (i.e., V in Equation 69 and G in Figure 7). :returns: An expression for Q[ancestral_set]. :math: Let $W \subseteq C \subseteq V$, and $W' = C \backslash W$. If $W$ is an ancestral set in the subgraph $G_{C}$ $(An(W)_{G_{C}} = W)$, or equivalently, if none of the parents of $W$ is in $W'$ $(Pa(W) \cap W' = \emptyset)$, then \begin{equation} \sum\limits_{W'}{Q[C]=Q[W]} \end{equation} """ # T\A is W', so Sum_{T\A}{Q[T]} = Q[A], corresponding to Sum_{W'}{Q[C]} = Q[W] in Equation 69 # The next two lines are included so the summation shows the marginalization variables in topological order marginalization_set = subgraph_variables - ancestral_set marginalization_variables = [v for v in ordering if v in marginalization_set] return Sum.safe(subgraph_probability, marginalization_variables)