Files
James Mete d15d8357a1 126 twitter data (#620)
* Added a script file to process json archive files into more unified parquet files focused on tweet reply rows for further processing.

* Added README file for Twitter data collection.

* Re did code for processing json into standardized parquet files.

* Added file to process parquet files into a conversation tree jsonl file.

* Added requirements and ran pre-commit.
2023-01-21 13:54:21 +01:00

142 lines
4.6 KiB
Python

import json
from pathlib import Path
import polars as pl
from tqdm import tqdm
# Sets up paths
# TODO: Source paths from env file
path_string = "PUT THE PATH HERE TO WHERE YOU STORED THE PARQUET FILES"
folder_path = Path(path_string)
processed_folder_path = folder_path / "processed"
output_path = folder_path / "twitter-conv-trees.jsonl"
# Get parq files
parq_files = sorted(processed_folder_path.rglob("*.parquet"))
wanted_cols = [
"timestamp_ms",
"id",
"text",
"truncated",
"in_reply_to_status_id",
"in_reply_to_user_id",
"is_quote_status",
"quote_count",
"reply_count",
"retweet_count",
"favorite_count",
"filter_level",
"lang",
"possibly_sensitive",
"hashtags",
"user_id",
"user_verified",
"user_followers_count",
"user_statuses_count",
]
# Load parqs into list. Using Polars for performance reasons.
df_list = []
for p in parq_files:
df_list.append(pl.read_parquet(p, columns=wanted_cols))
# Create major dataframe.
# This can be done incrementally if RAM is constrained by modifying the above code.
p_df = pl.concat(df_list)
# Clean up the reference just in case to help with memory if needed.
del df_list
# Get tweets that are replies to other tweets
p_df_replies_only = p_df.filter(pl.col("in_reply_to_status_id").is_null().is_not())
# Group by replied to status id to see the most replied to statuses. This can take some time.
p_df_group_reply_to_status = p_df_replies_only.groupby("in_reply_to_status_id").count().sort("count", reverse=True)
# Save output of grouping the top replied to statuses
group_reply_parq = folder_path / "group_reply_parq.parquet"
p_df_group_reply_to_status.write_parquet(group_reply_parq)
# Join the main dataframe with the top replies to find tweets that have replies.
p_join = p_df.join(p_df_group_reply_to_status, left_on="id", right_on="in_reply_to_status_id", how="inner")
# Save output of tweets that have replies
tweets_that_have_replies_path = folder_path / "tweets_that_have_replies.parquet"
p_join.write_parquet(tweets_that_have_replies_path)
# Save output of tweets that are replies to other tweets
tweets_that_are_replies_path = folder_path / "tweets_that_are_replies.parquet"
p_df_replies_only.write_parquet(tweets_that_are_replies_path)
# Filter the tweets that have replies to ones that aren't replies to others.
# Also filter for only english for now.
# This gives the root tweets that have replies but are the start of a conversation.
origin_tweets = p_join.filter((pl.col("in_reply_to_status_id").is_null()) & (pl.col("lang") == "en"))
# Helper functions and classes below for the next steps
def role_decide(user_id, prompt_user):
if user_id == prompt_user:
return "prompter"
else:
return "assistant"
class ConversationTreeNode:
def __init__(self, tweet_id, prompt_user, from_df, children_df, metadata=None):
if metadata:
self.metadata = metadata
else:
self.metadata = from_df.filter(pl.col("id") == tweet_id).to_dicts()[0]
self.metadata["prompt_user"] = prompt_user
self.role = role_decide(self.metadata["user_id"], prompt_user)
self.children = None
self.text = self.metadata["text"]
del self.metadata["text"]
self.get_children(tweet_id=tweet_id, children_df=children_df)
def get_children(self, tweet_id, children_df):
children_dicts = children_df.filter(pl.col("in_reply_to_status_id") == tweet_id).to_dicts()
if len(children_dicts) > 0:
children = [
ConversationTreeNode(
tweet_id=c["id"],
prompt_user=self.metadata["prompt_user"],
from_df=children_df,
children_df=children_df,
metadata=c,
)
for c in children_dicts
]
self.children = children
class ConversationTree:
def __init__(self, tweet_id, prompt_user, from_df, children_df, r_metadata=None):
self.root = ConversationTreeNode(
tweet_id=tweet_id, prompt_user=prompt_user, from_df=from_df, children_df=children_df, metadata=r_metadata
)
self.metadata = None
# Create conversation trees
conv_tree_list = [
ConversationTree(
tweet_id=r["id"], prompt_user=r["user_id"], from_df=origin_tweets, children_df=p_df_replies_only, r_metadata=r
)
for r in tqdm(origin_tweets.to_dicts())
]
# Write conversation trees to jsonl file.
# Might need to clean up the last newline.
with open(output_path, "w") as output:
for t in tqdm(conv_tree_list):
json.dump(obj=t, fp=output, default=lambda x: x.__dict__)
output.write("\n")