"""An implementation of Tian and Pearl's identification algorithm from [tian03a]_.
.. [tikka20a] https://github.com/santikka/causaleffect/blob/master/R/compute.c.factor.R.
.. [tikka20b] https://github.com/santikka/causaleffect/blob/master/R/identify.R.
.. [tian03a] Tian J, Pearl J (2003). `On the Identification of Causal Effects
<https://ftp.cs.ucla.edu/pub/stat_ser/R290-L.pdf>`_. Technical report, Department of
Computer Science, University of California, Los Angeles. R-290-L
"""
import logging
from collections.abc import Collection, Sequence
from y0.dsl import (
Distribution,
Expression,
Fraction,
One,
P,
PopulationProbability,
Probability,
Product,
Sum,
Variable,
)
from y0.graph import NxMixedGraph
__all__ = [
"compute_ancestral_set_q_value",
"compute_c_factor",
"compute_c_factor_conditioning_on_topological_predecessors",
"compute_c_factor_marginalizing_over_topological_successors",
"compute_q_value_of_variables_with_low_topological_ordering_indices",
"identify_district_variables",
]
logger = logging.getLogger(__name__)
[docs]
def identify_district_variables( # noqa:C901
*,
input_variables: set[Variable],
input_district: set[Variable],
district_probability: Expression,
graph: NxMixedGraph,
ordering: Sequence[Variable],
) -> Expression | None:
"""Implement the IDENTIFY algorithm as presented in [tian03a]_ with pseudocode in [correa22a]_ (Algorithm 5).
Tikka and colleagues implemented this algorithm in the R package Causal Effect
([tikka20b]_). We draw from that implementation. Their version also keeps track of
the structure of calls, while this one does not.
:param input_variables: The set of variables, C, for which we're checking if causal
identification is possible.
:param input_district: The C-component, T, containing C.
:param district_probability: The expression $Q[T]$ as per [tian03a]_, Equation 34.
Because T is a single district, $Q[T]$ is the "post-intervention distribution of
the variables in T, under an intervention that sets all other variables to
constants" (see Equation 36 of [tian03a]_).
:param graph: The relevant graph.
:param ordering: A list of variables in topological order that includes all
variables in the graph and may contain more.
:returns: An expression for $Q[C]$ in terms of $Q$, or Fail.
:raises KeyError: at least one input variable is not in the input district or at
least one input district variable is not in the topologically sorted list of
graph variables.
:raises TypeError: the subgraph of the input graph G comprised of the vertices in
the input vertex set T should have only district and has more.
:raises NotImplementedError: If we get to the end of the conditional, which still
needs an "else"
"""
if input_variables.intersection(input_district) != input_variables:
# if not all(v in input_district for v in input_variables):
raise KeyError(
"In identify_district_variables: at least one of the input variables C is not in the input district T."
)
if input_district.intersection(ordering) != input_district:
raise KeyError(
"In identify_district_variables: at least one input district variable is not in the "
"topologically sorted variable list."
)
district_subgraph = graph.subgraph(input_district) # $G_{T}$
if len(district_subgraph.districts()) > 1:
raise TypeError(
"In identify_district_variables: the subgraph of the input graph G comprised of the"
" vertices in the input vertex set T should have only district and has more."
)
if (not isinstance(district_probability, Expression)) | isinstance(
district_probability, Expression
) and not (isinstance(district_probability, Sum | Product | Fraction | Probability)):
raise TypeError(
"In identify_district_variables: the district probability must be an expression that is a "
"Sum, Product, Fraction, or Probability."
)
# A = Ancestors of C in $G_{T}$
ancestral_set = district_subgraph.ancestors_inclusive(input_variables)
# Next, Tikka has an additional line intersecting the ancestral set with the set T in case any C was not in T,
# but we raise an error in that case as a pre-processing step, so we omit that line.
ordered_ancestral_set = [a for a in ordering if a in ancestral_set]
if ancestral_set == input_variables:
logger.debug("In identify_district_variables: A = C. Applying Lemma 3.")
logger.debug(" Subgraph_probability = " + district_probability.to_latex())
return compute_ancestral_set_q_value(
ancestral_set=ancestral_set,
subgraph_variables=input_district,
subgraph_probability=district_probability,
ordering=ordering,
)
elif ancestral_set == input_district:
logger.debug("In identify_district_variables: A = T. Returning None (i.e., FAIL).")
logger.debug(" A = " + str(input_district))
logger.debug(" T = " + str(ancestral_set))
return None
elif input_variables.issubset(ancestral_set) and ancestral_set.issubset(input_district):
ancestral_set_subgraph = graph.subgraph(ordered_ancestral_set)
ancestral_set_subgraph_districts = list(ancestral_set_subgraph.districts())
targeted_ancestral_set_subgraph_district = set(
ancestral_set_subgraph_districts[
[
input_variables.issubset(district)
for district in ancestral_set_subgraph_districts
].index(True)
]
)
# t_prime = [district for district in ancestral_set_subgraph_districts if
# input_variables.intersect(district)==input_variables][0]
# ordered_t_prime_vertices = [v for v in topo if v in t_prime]
# t_one = t_prime.intersection(ancestral_set) # RC: This line is in Tikka
if isinstance(district_probability, Fraction | Product | Sum): # Compute Q[A] from Lemma 3
ancestral_set_probability = compute_ancestral_set_q_value(
ancestral_set=ancestral_set,
subgraph_variables=input_district,
subgraph_probability=district_probability, # Q[T]
ordering=ordering,
)
elif isinstance(district_probability, Probability):
distribution = (
ordered_ancestral_set[0].joint(ordered_ancestral_set[1:])
| district_probability.parents
)
if isinstance(district_probability, PopulationProbability):
ancestral_set_probability = PopulationProbability(
population=district_probability.population,
distribution=distribution,
)
else:
ancestral_set_probability = P(distribution)
logger.debug(
"Got ancestral_set_probability. Result = " + ancestral_set_probability.to_latex()
)
else:
raise TypeError(
"In identify_district_variables: the district probability is an expression of an unknown type."
)
# Get Q[T'] by Lemma 4 or Lemma 1
logger.debug(
"In identify_district_variables: about to call _compute_c_factor. Subgraph_probability = "
+ ancestral_set_probability.to_latex()
)
targeted_ancestral_set_subgraph_district_probability = compute_c_factor(
district=targeted_ancestral_set_subgraph_district,
subgraph_variables=ancestral_set,
subgraph_probability=ancestral_set_probability,
ordering=ordering,
)
logger.debug(
"In identify_district_variables: about to recursively call identify_district_variables."
)
logger.debug(" C = " + str(input_variables))
logger.debug(" T' = " + str(targeted_ancestral_set_subgraph_district))
logger.debug(" Q[T'] =" + str(targeted_ancestral_set_subgraph_district_probability))
logger.debug(" graph nodes = " + str(list(graph.nodes())))
logger.debug(" topo = " + str(ordering))
return identify_district_variables(
input_variables=input_variables,
input_district=targeted_ancestral_set_subgraph_district,
district_probability=targeted_ancestral_set_subgraph_district_probability,
graph=graph,
ordering=ordering,
)
else:
raise NotImplementedError
[docs]
def compute_c_factor_conditioning_on_topological_predecessors(
*,
district: Collection[Variable],
graph_probability: Probability,
ordering: Sequence[Variable],
) -> Expression:
r"""Compute the Q value associated with the C-component (district) in a graph as per [tian03a]_, Equation 37.
This algorithm uses part (i) of Lemma 1 of [tian03a]_.
:param district: A list of variables comprising the district for which we're
computing a C factor.
:param graph_probability: the Q value for the full graph.
:param ordering: a topological sort of the vertices in the graph.
:returns: An expression for Q[district].
:raises TypeError: the district or variable set from which it is drawn contained no
variables.
:raises KeyError: a variable in the district is not in the topological sort of the
graph vertices.
:math: Let a topological order over $V$ be $V_{1} < \ldots < V_{n}$, and let
$V^{(i)}=\{V_{1},\ldots,V_{i}\}$, $i = 1,\ldots,n$, and $V^{(0)} = \emptyset$.
For any set $C$, let $G_{C}$ denote the subgraph of $G$ composed only of
variables in $C$. Then each c-factor $Q_{j}$, $j=1,\ldots,k$, is identifiable
and is given by
\begin{equation} $Q_{j} = \prod_\limit{\{i|V_{i}\in
S_{j}\}}{P(v_i}|v^{(i-1}))}$. \end{equation}
"""
# (Topological sort is O(V+E): https://stackoverflow.com/questions/31010922/)
variables = set(ordering)
logger.debug(
"In _compute_c_factor_conditioning_on_topological_predecessors: topo = " + str(ordering)
)
logger.debug(
"In _compute_c_factor_conditioning_on_topological_predecessors: graph_probability = "
+ graph_probability.to_latex()
)
if len(district) == 0 or len(variables) == 0:
raise TypeError(
"Error in _compute_c_factor_conditioning_on_topological_predecessors: the district or variable "
+ "set from which it is drawn contained no variables."
)
if any(v not in variables for v in district):
raise KeyError(
"Error in _compute_c_factor_conditioning_on_topological_predecessors: a variable in the district"
+ " is not in the topological sort of the graph vertices."
)
if isinstance(graph_probability, PopulationProbability):
population_probabilities = []
# A little subtle so it deserves a comment: the Q value passed into Tian's Identify function may
# already be conditioned on some variables that are in G but not in the subgraph H. In applying Lemma 1
# (but not Lemma 4), we have to make sure we're also conditioning on those variables.
graph_probability_parents = set(graph_probability.parents)
for variable in district:
preceding_variables = ordering[: ordering.index(variable)]
conditioned_variables = graph_probability_parents.union(preceding_variables) # V^(i-1)
pp = PopulationProbability(
population=graph_probability.population,
distribution=Distribution(
children=frozenset([variable]), parents=frozenset(conditioned_variables)
),
)
population_probabilities.append(pp)
logger.debug(
"In _compute_c_factor_conditioning_on_topological_predecessors: returning "
+ str(Product.safe(population_probabilities))
)
logger.debug(
"Return value in Latex form is " + Product.safe(population_probabilities).to_latex()
)
return Product.safe(population_probabilities)
else:
probabilities = []
# A little subtle so it deserves a comment: the Q value passed into Tian's Identify function may
# already be conditioned on some variables that are in G but not in the subgraph H. In applying Lemma 1
# (but not Lemma 4), we have to make sure we're also conditioning on those variables.
graph_probability_parents = set(graph_probability.parents)
for variable in district:
preceding_variables = ordering[: ordering.index(variable)]
conditioned_variables = graph_probability_parents.union(preceding_variables) # V^(i-1)
probability = P(variable | conditioned_variables) # v_i
probabilities.append(probability)
logger.debug(
"In _compute_c_factor_conditioning_on_topological_predecessors: returning "
+ str(Product.safe(probabilities))
)
logger.debug("Return value in Latex form is " + Product.safe(probabilities).to_latex())
return Product.safe(probabilities)
[docs]
def compute_q_value_of_variables_with_low_topological_ordering_indices(
*,
node: Variable | None,
expression: Expression, # Q[H]
ordering: Sequence[Variable],
) -> Expression:
r"""Compute the Q value of a set of variables according to [tian03a]_, Equation 72.
:math: Let $H \subseteq V$, and assume that $H$ is partitioned into c-components
$H_{1}, \dots, V_{h_{l}}$ in the subgraph $G_{H}$. Then we have...
(ii) Let $k$ be the number of variables in $H$, and let a topological order of
the variables in $H$ be $V_{h_{1}} < \cdots < V_{h_{k}}$ be the set of variables
in $G_{H}$. Let $H^{(i)} = {V_{h_{1}},\dots,V_{h_{i}}$ be the set of variables
in $H$ ordered before $V_{h_{i}}$ (including $V_{h_{i}}$), $i=1,\dots,k$, and
$H^{(0)} = \emptyset$. Then each $Q[H_{j}]$, $j = 1,\dots,l$, is computable from
$Q[H]$ and is given by
\begin{equation} $Q[H_{j} = \prod_{\{i|V_{h_{i}}\in
H_{j}\}}{\frac{Q[H^{(i)}]}{Q[H^{(i-1)}]},$ \end{equation}
where each $Q[H^{(i)}], i = 0, 1, \dots\, k$, is given by
\begin{equation} $Q[H^{(i)}] = \sum_{h \backslash h^{(i)}}{Q[H]}$.
\end{equation}
(The second equation above is Equation 72.)
:param node: The $i^{th}$ variable in topological order
:param expression: The probability of $H$ corresponding to $Q[H]$ in Equation 72.
:param ordering: a topological sorting of the subgraph under analysis, $G_{H}$.
:returns: An expression for $Q[H^{(i)}]$.
:raises KeyError: the input vertex is not in the variable set or not in the
topological ordering of graph vertices.
"""
# $Q[H^{(0)}] = Q[\emptyset] = 1$
if node is None:
return One()
logger.debug(
"In _compute_q_value_of_variables_with_low_topological_ordering_indices: input vertex is "
+ str(node)
)
logger.debug(" and ordering is %s", ordering)
if node not in ordering:
raise KeyError(
"In _compute_q_value_of_variables_with_low_topological_ordering_indices: input vertex "
+ "%s is not in the input graph.",
node,
)
ranges = ordering[ordering.index(node) + 1 :]
return Sum.safe(expression, ranges)
[docs]
def compute_c_factor_marginalizing_over_topological_successors(
*, district: Collection[Variable], graph_probability: Expression, ordering: Sequence[Variable]
) -> Expression:
r"""Compute the Q value associated with the C-component (district) in a graph as per [tian03a]_, eqns. 71 and 72.
This algorithm uses part (ii) of Lemma 4 of [tian03a]_. The context for Equations 71
and 72 follow:
:param district: A list of variables comprising the district for which we're
computing a C factor.
:param graph_probability: The expression $Q$ corresponding to the set of variables
in $v$. It is $Q[A]$ on the line calling Lemma 4 in [tian03a]_, Figure 7.
:param ordering: a topological ordering of the vertices in the subgraph $G_{H}$ in
question.
:returns: An expression for Q[district].
:math: Let $H \subseteq V$, and assume that $H$ is partitioned into c-components
$H_{1}, \dots, V_{h_{l}}$ in the subgraph $G_{H}$. Then we have...
(ii) Let $k$ be the number of variables in $H$, and let a topological order of
the variables in $H$ be $V_{h_{1}} < \cdots < V_{h_{k}}$ be the set of variables
in $G_{H}$. Let $H^{(i)} = {V_{h_{1}},\dots,V_{h_{i}}$ be the set of variables
in $H$ ordered before $V_{h_{i}}$ (including $V_{h_{i}}$), $i=1,\dots,k$, and
$H^{(0)} = \emptyset$. Then each $Q[H_{j}]$, $j = 1,\dots,l$, is computable from
$Q[H]$ and is given by
\begin{equation} $Q[H_{j} = \prod_{\{i|V_{h_{i}}\in
H_{j}\}}{\frac{Q[H^{(i)}]}{Q[H^{(i-1)}]},$ \end{equation}
where each $Q[H^{(i)}], i = 0, 1, \dots\, k$, is given by
\begin{equation} $Q[H^{(i)}] = \sum_{h \backslash h^{(i)}}{Q[H]}$.
\end{equation}
"""
def _get_expression_from_index(index: int) -> Expression: # Compute $Q[H^{i}]$ given i
current_index_expr = compute_q_value_of_variables_with_low_topological_ordering_indices(
node=ordering[index], expression=graph_probability, ordering=ordering
)
if index == 0:
return current_index_expr
previous_index_expr = compute_q_value_of_variables_with_low_topological_ordering_indices(
node=ordering[index - 1], expression=graph_probability, ordering=ordering
)
return Fraction(current_index_expr, previous_index_expr)
rv = Product.safe(_get_expression_from_index(ordering.index(vertex)) for vertex in district)
# TODO simplify this product by cancelling terms in the numerator and denominator.
return rv
[docs]
def compute_c_factor(
*,
district: Collection[Variable],
subgraph_variables: Collection[Variable],
subgraph_probability: Expression,
ordering: Sequence[Variable],
) -> Expression:
"""Compute the Q value associated with the C-component (district) in a graph as per [tian03a]_ and [tikka20a]_.
This algorithm uses both Lemma 1, part (i) of Tian03a (Equation 37) and Lemma 4 of
Tian 03a (Equations 71 and 72).
:param district: A list of variables comprising the district C for which we're
computing a C factor.
:param subgraph_variables: The variables in the subgraph T under analysis.
:param subgraph_probability: The expression Q corresponding to the set of variables
in T. As an example, this quantity would be Q[A] on the line calling Lemma 4 in
[tian03a]_, Figure 7.
:param ordering: A list of variables in topological order that includes all
variables in G, where T is contained in G.
:returns: An expression for Q[district].
:raises TypeError: In _compute_c_factor: expected the subgraph_probability parameter
to be a simple probability.
"""
# The graph_topo is the ordering of vertices in G, but the lemmas use the topological sorting in a subgraph H of G.
# We take in the topological ordering of G to make testing easier, as there could be multiple ways to
# sort the vertices in H topologically. It is also faster as topological sort is O(V+E) and getting
# subgraph_topo below is O(V).
subgraph_ordering = [v for v in ordering if v in subgraph_variables]
if isinstance(subgraph_probability, Fraction | Product | Sum):
# Lemma 4
return compute_c_factor_marginalizing_over_topological_successors(
district=district, graph_probability=subgraph_probability, ordering=subgraph_ordering
)
elif isinstance(subgraph_probability, Probability):
# Lemma 1
return compute_c_factor_conditioning_on_topological_predecessors(
district=district, graph_probability=subgraph_probability, ordering=subgraph_ordering
)
else:
raise TypeError(
f"Expected fraction, product, sum, or probability. got: {subgraph_probability}"
)
[docs]
def compute_ancestral_set_q_value(
*,
ancestral_set: set[Variable], # A
subgraph_variables: set[Variable], # T
subgraph_probability: Expression,
ordering: Sequence[Variable], # topological ordering of variables in a graph containing T
) -> Expression:
r"""Compute the Q value associated with a subgraph as per Equation 69 of [tian03a]_.
This algorithm uses Lemma 3 of [tian03a]_ (Equation 69).
:param ancestral_set: A set of variables (W in Equation 69, A in Figure 7 of
[tian03a]_) that comprise the ancestral set of some other variables (unspecified
in Equation 69, and C in Figure 7 of [tian03a]_).
:param subgraph_variables: The variables in the subgraph under analysis (C in
Equation 69, and T in Figure 7).
:param subgraph_probability: The expression Q corresponding to Q[C] in Equation 69
and Q[T] in Figure 7.
:param ordering: A list of variables in topological order that includes all
variables in the graph (i.e., V in Equation 69 and G in Figure 7).
:returns: An expression for Q[ancestral_set].
:math: Let $W \subseteq C \subseteq V$, and $W' = C \backslash W$. If $W$ is an
ancestral set in the subgraph $G_{C}$ $(An(W)_{G_{C}} = W)$, or equivalently, if
none of the parents of $W$ is in $W'$ $(Pa(W) \cap W' = \emptyset)$, then
\begin{equation} \sum\limits_{W'}{Q[C]=Q[W]} \end{equation}
"""
# T\A is W', so Sum_{T\A}{Q[T]} = Q[A], corresponding to Sum_{W'}{Q[C]} = Q[W] in Equation 69
# The next two lines are included so the summation shows the marginalization variables in topological order
marginalization_set = subgraph_variables - ancestral_set
marginalization_variables = [v for v in ordering if v in marginalization_set]
return Sum.safe(subgraph_probability, marginalization_variables)