mumin.id_updator

Class that updates the precomputed IDs

View Source
"""Class that updates the precomputed IDs"""

from typing import Dict, Tuple

import numpy as np
import pandas as pd


class IdUpdator:
    """Class that updates the IDs of nodes and relations"""

    def update_all(
        self,
        nodes: Dict[str, pd.DataFrame],
        rels: Dict[Tuple[str, str, str], pd.DataFrame],
    ) -> Tuple[dict, dict]:
        """Extract all node and relation data.

        Args:
            nodes (Dict[str, pd.DataFrame]):
                A dictionary of node dataframes.
            rels (Dict[Tuple[str, str, str], pd.DataFrame]):
                A dictionary of relation dataframes.

        Returns:
            pair of dicts:
                A tuple of updated node and relation dictionaries.
        """
        rel = ("tweet", "discusses", "claim")
        if rel in rels.keys():
            rels[rel] = self._update_tweet_discusses_claim(
                rel_df=rels[rel], tweet_df=nodes["tweet"], claim_df=nodes["claim"]
            )

        rel = ("article", "discusses", "claim")
        if rel in rels.keys():
            rels[rel] = self._update_article_discusses_claim(
                rel_df=rels[rel], article_df=nodes["article"], claim_df=nodes["claim"]
            )

        rel = ("user", "follows", "user")
        if rel in rels.keys():
            rels[rel] = self._update_user_follows_user(
                rel_df=rels[rel], user_df=nodes["user"]
            )

        rel = ("reply", "reply_to", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_reply_reply_to_tweet(
                rel_df=rels[rel], reply_df=nodes["reply"], tweet_df=nodes["tweet"]
            )

        rel = ("reply", "quote_of", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_reply_quote_of_tweet(
                rel_df=rels[rel], reply_df=nodes["reply"], tweet_df=nodes["tweet"]
            )

        rel = ("user", "retweeted", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_user_retweeted_tweet(
                rel_df=rels[rel], user_df=nodes["user"], tweet_df=nodes["tweet"]
            )

        # Remove ID columns from the claim and article dataframes
        nodes["claim"] = self._remove_id_column(node_df=nodes["claim"])
        if "article" in nodes.keys():
            nodes["article"] = self._remove_id_column(node_df=nodes["article"])

        return nodes, rels

    def _update_tweet_discusses_claim(
        self, rel_df: pd.DataFrame, tweet_df: pd.DataFrame, claim_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Tweet)-[:DISCUSSES]->(:Claim) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.
            claim_df (pd.DataFrame): The claim dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="src",
                    right_on="tweet_id",
                )
                .merge(
                    claim_df[["id"]]
                    .reset_index()
                    .rename(columns=dict(index="claim_idx")),
                    left_on="tgt",
                    right_on="id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.tweet_idx.tolist(), tgt=merged.claim_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_article_discusses_claim(
        self, rel_df: pd.DataFrame, article_df: pd.DataFrame, claim_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Article)-[:DISCUSSES]->(:Claim) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            article_df (pd.DataFrame): The article dataframe.
            claim_df (pd.DataFrame): The claim dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    article_df[["id"]]
                    .reset_index()
                    .rename(columns=dict(index="art_idx")),
                    left_on="src",
                    right_on="id",
                )
                .merge(
                    claim_df[["id"]]
                    .reset_index()
                    .rename(columns=dict(index="claim_idx")),
                    left_on="tgt",
                    right_on="id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.art_idx.tolist(), tgt=merged.claim_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_user_follows_user(
        self, rel_df: pd.DataFrame, user_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:User)-[:FOLLOWS]->(:User) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            user_df (pd.DataFrame): The user dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    user_df[["user_id"]]
                    .reset_index()
                    .rename(columns=dict(index="user_idx1")),
                    left_on="src",
                    right_on="user_id",
                )
                .merge(
                    user_df[["user_id"]]
                    .reset_index()
                    .rename(columns=dict(index="user_idx2")),
                    left_on="tgt",
                    right_on="user_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.user_idx1.tolist(), tgt=merged.user_idx2.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_reply_reply_to_tweet(
        self, rel_df: pd.DataFrame, reply_df: pd.DataFrame, tweet_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Reply)-[:REPLY_TO]->(:Tweet) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            reply_df (pd.DataFrame): The reply dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    reply_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="reply_idx")),
                    left_on="src",
                    right_on="tweet_id",
                )
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="tgt",
                    right_on="tweet_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.reply_idx.tolist(), tgt=merged.tweet_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_reply_quote_of_tweet(
        self, rel_df: pd.DataFrame, reply_df: pd.DataFrame, tweet_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Reply)-[:QUOTE_OF]->(:Tweet) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            reply_df (pd.DataFrame): The reply dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    reply_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="reply_idx")),
                    left_on="src",
                    right_on="tweet_id",
                )
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="tgt",
                    right_on="tweet_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.reply_idx.tolist(), tgt=merged.tweet_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_user_retweeted_tweet(
        self, rel_df: pd.DataFrame, user_df: pd.DataFrame, tweet_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:User)-[:RETWEETED]->(:Tweet) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            user_df (pd.DataFrame): The user dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    user_df[["user_id"]]
                    .reset_index()
                    .rename(columns=dict(index="user_idx")),
                    left_on="src",
                    right_on="user_id",
                )
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="tgt",
                    right_on="tweet_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.user_idx.tolist(), tgt=merged.tweet_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _remove_id_column(self, node_df: pd.DataFrame) -> pd.DataFrame:
        """Remove the id column from the node dataframe.

        Args:
            node_df (pd.DataFrame): The node dataframe.

        Returns:
            pd.DataFrame: The node dataframe without the id column.
        """
        if len(node_df) > 0:
            node_df = node_df.drop(columns="id")
        return node_df
#   class IdUpdator:
View Source
class IdUpdator:
    """Class that updates the IDs of nodes and relations"""

    def update_all(
        self,
        nodes: Dict[str, pd.DataFrame],
        rels: Dict[Tuple[str, str, str], pd.DataFrame],
    ) -> Tuple[dict, dict]:
        """Extract all node and relation data.

        Args:
            nodes (Dict[str, pd.DataFrame]):
                A dictionary of node dataframes.
            rels (Dict[Tuple[str, str, str], pd.DataFrame]):
                A dictionary of relation dataframes.

        Returns:
            pair of dicts:
                A tuple of updated node and relation dictionaries.
        """
        rel = ("tweet", "discusses", "claim")
        if rel in rels.keys():
            rels[rel] = self._update_tweet_discusses_claim(
                rel_df=rels[rel], tweet_df=nodes["tweet"], claim_df=nodes["claim"]
            )

        rel = ("article", "discusses", "claim")
        if rel in rels.keys():
            rels[rel] = self._update_article_discusses_claim(
                rel_df=rels[rel], article_df=nodes["article"], claim_df=nodes["claim"]
            )

        rel = ("user", "follows", "user")
        if rel in rels.keys():
            rels[rel] = self._update_user_follows_user(
                rel_df=rels[rel], user_df=nodes["user"]
            )

        rel = ("reply", "reply_to", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_reply_reply_to_tweet(
                rel_df=rels[rel], reply_df=nodes["reply"], tweet_df=nodes["tweet"]
            )

        rel = ("reply", "quote_of", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_reply_quote_of_tweet(
                rel_df=rels[rel], reply_df=nodes["reply"], tweet_df=nodes["tweet"]
            )

        rel = ("user", "retweeted", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_user_retweeted_tweet(
                rel_df=rels[rel], user_df=nodes["user"], tweet_df=nodes["tweet"]
            )

        # Remove ID columns from the claim and article dataframes
        nodes["claim"] = self._remove_id_column(node_df=nodes["claim"])
        if "article" in nodes.keys():
            nodes["article"] = self._remove_id_column(node_df=nodes["article"])

        return nodes, rels

    def _update_tweet_discusses_claim(
        self, rel_df: pd.DataFrame, tweet_df: pd.DataFrame, claim_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Tweet)-[:DISCUSSES]->(:Claim) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.
            claim_df (pd.DataFrame): The claim dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="src",
                    right_on="tweet_id",
                )
                .merge(
                    claim_df[["id"]]
                    .reset_index()
                    .rename(columns=dict(index="claim_idx")),
                    left_on="tgt",
                    right_on="id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.tweet_idx.tolist(), tgt=merged.claim_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_article_discusses_claim(
        self, rel_df: pd.DataFrame, article_df: pd.DataFrame, claim_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Article)-[:DISCUSSES]->(:Claim) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            article_df (pd.DataFrame): The article dataframe.
            claim_df (pd.DataFrame): The claim dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    article_df[["id"]]
                    .reset_index()
                    .rename(columns=dict(index="art_idx")),
                    left_on="src",
                    right_on="id",
                )
                .merge(
                    claim_df[["id"]]
                    .reset_index()
                    .rename(columns=dict(index="claim_idx")),
                    left_on="tgt",
                    right_on="id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.art_idx.tolist(), tgt=merged.claim_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_user_follows_user(
        self, rel_df: pd.DataFrame, user_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:User)-[:FOLLOWS]->(:User) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            user_df (pd.DataFrame): The user dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    user_df[["user_id"]]
                    .reset_index()
                    .rename(columns=dict(index="user_idx1")),
                    left_on="src",
                    right_on="user_id",
                )
                .merge(
                    user_df[["user_id"]]
                    .reset_index()
                    .rename(columns=dict(index="user_idx2")),
                    left_on="tgt",
                    right_on="user_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.user_idx1.tolist(), tgt=merged.user_idx2.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_reply_reply_to_tweet(
        self, rel_df: pd.DataFrame, reply_df: pd.DataFrame, tweet_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Reply)-[:REPLY_TO]->(:Tweet) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            reply_df (pd.DataFrame): The reply dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    reply_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="reply_idx")),
                    left_on="src",
                    right_on="tweet_id",
                )
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="tgt",
                    right_on="tweet_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.reply_idx.tolist(), tgt=merged.tweet_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_reply_quote_of_tweet(
        self, rel_df: pd.DataFrame, reply_df: pd.DataFrame, tweet_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:Reply)-[:QUOTE_OF]->(:Tweet) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            reply_df (pd.DataFrame): The reply dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    reply_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="reply_idx")),
                    left_on="src",
                    right_on="tweet_id",
                )
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="tgt",
                    right_on="tweet_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.reply_idx.tolist(), tgt=merged.tweet_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _update_user_retweeted_tweet(
        self, rel_df: pd.DataFrame, user_df: pd.DataFrame, tweet_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Update the (:User)-[:RETWEETED]->(:Tweet) relation.

        Args:
            rel_df (pd.DataFrame): The relation dataframe.
            user_df (pd.DataFrame): The user dataframe.
            tweet_df (pd.DataFrame): The tweet dataframe.

        Returns:
            pd.DataFrame: The updated relation dataframe.
        """
        if len(rel_df) > 0:
            merged = (
                rel_df.astype(dict(src=np.uint64, tgt=np.uint64))
                .merge(
                    user_df[["user_id"]]
                    .reset_index()
                    .rename(columns=dict(index="user_idx")),
                    left_on="src",
                    right_on="user_id",
                )
                .merge(
                    tweet_df[["tweet_id"]]
                    .reset_index()
                    .rename(columns=dict(index="tweet_idx")),
                    left_on="tgt",
                    right_on="tweet_id",
                )
            )
            if len(merged) > 0:
                data_dict = dict(
                    src=merged.user_idx.tolist(), tgt=merged.tweet_idx.tolist()
                )
                rel_df = pd.DataFrame(data_dict)
            else:
                rel_df = pd.DataFrame()

        return rel_df

    def _remove_id_column(self, node_df: pd.DataFrame) -> pd.DataFrame:
        """Remove the id column from the node dataframe.

        Args:
            node_df (pd.DataFrame): The node dataframe.

        Returns:
            pd.DataFrame: The node dataframe without the id column.
        """
        if len(node_df) > 0:
            node_df = node_df.drop(columns="id")
        return node_df

Class that updates the IDs of nodes and relations

#   IdUpdator()
#   def update_all( self, nodes: Dict[str, pandas.core.frame.DataFrame], rels: Dict[Tuple[str, str, str], pandas.core.frame.DataFrame] ) -> Tuple[dict, dict]:
View Source
    def update_all(
        self,
        nodes: Dict[str, pd.DataFrame],
        rels: Dict[Tuple[str, str, str], pd.DataFrame],
    ) -> Tuple[dict, dict]:
        """Extract all node and relation data.

        Args:
            nodes (Dict[str, pd.DataFrame]):
                A dictionary of node dataframes.
            rels (Dict[Tuple[str, str, str], pd.DataFrame]):
                A dictionary of relation dataframes.

        Returns:
            pair of dicts:
                A tuple of updated node and relation dictionaries.
        """
        rel = ("tweet", "discusses", "claim")
        if rel in rels.keys():
            rels[rel] = self._update_tweet_discusses_claim(
                rel_df=rels[rel], tweet_df=nodes["tweet"], claim_df=nodes["claim"]
            )

        rel = ("article", "discusses", "claim")
        if rel in rels.keys():
            rels[rel] = self._update_article_discusses_claim(
                rel_df=rels[rel], article_df=nodes["article"], claim_df=nodes["claim"]
            )

        rel = ("user", "follows", "user")
        if rel in rels.keys():
            rels[rel] = self._update_user_follows_user(
                rel_df=rels[rel], user_df=nodes["user"]
            )

        rel = ("reply", "reply_to", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_reply_reply_to_tweet(
                rel_df=rels[rel], reply_df=nodes["reply"], tweet_df=nodes["tweet"]
            )

        rel = ("reply", "quote_of", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_reply_quote_of_tweet(
                rel_df=rels[rel], reply_df=nodes["reply"], tweet_df=nodes["tweet"]
            )

        rel = ("user", "retweeted", "tweet")
        if rel in rels.keys():
            rels[rel] = self._update_user_retweeted_tweet(
                rel_df=rels[rel], user_df=nodes["user"], tweet_df=nodes["tweet"]
            )

        # Remove ID columns from the claim and article dataframes
        nodes["claim"] = self._remove_id_column(node_df=nodes["claim"])
        if "article" in nodes.keys():
            nodes["article"] = self._remove_id_column(node_df=nodes["article"])

        return nodes, rels

Extract all node and relation data.

Args
  • nodes (Dict[str, pd.DataFrame]): A dictionary of node dataframes.
  • rels (Dict[Tuple[str, str, str], pd.DataFrame]): A dictionary of relation dataframes.
Returns

pair of dicts: A tuple of updated node and relation dictionaries.