mirror of
https://github.com/wassname/Open-Assistant.git
synced 2026-06-27 16:10:30 +08:00
d15d8357a1
* Added a script file to process json archive files into more unified parquet files focused on tweet reply rows for further processing. * Added README file for Twitter data collection. * Re did code for processing json into standardized parquet files. * Added file to process parquet files into a conversation tree jsonl file. * Added requirements and ran pre-commit.
142 lines
4.6 KiB
Python
142 lines
4.6 KiB
Python
import json
|
|
from pathlib import Path
|
|
|
|
import polars as pl
|
|
from tqdm import tqdm
|
|
|
|
# Sets up paths
|
|
# TODO: Source paths from env file
|
|
path_string = "PUT THE PATH HERE TO WHERE YOU STORED THE PARQUET FILES"
|
|
folder_path = Path(path_string)
|
|
processed_folder_path = folder_path / "processed"
|
|
output_path = folder_path / "twitter-conv-trees.jsonl"
|
|
|
|
# Get parq files
|
|
parq_files = sorted(processed_folder_path.rglob("*.parquet"))
|
|
|
|
wanted_cols = [
|
|
"timestamp_ms",
|
|
"id",
|
|
"text",
|
|
"truncated",
|
|
"in_reply_to_status_id",
|
|
"in_reply_to_user_id",
|
|
"is_quote_status",
|
|
"quote_count",
|
|
"reply_count",
|
|
"retweet_count",
|
|
"favorite_count",
|
|
"filter_level",
|
|
"lang",
|
|
"possibly_sensitive",
|
|
"hashtags",
|
|
"user_id",
|
|
"user_verified",
|
|
"user_followers_count",
|
|
"user_statuses_count",
|
|
]
|
|
|
|
# Load parqs into list. Using Polars for performance reasons.
|
|
df_list = []
|
|
for p in parq_files:
|
|
df_list.append(pl.read_parquet(p, columns=wanted_cols))
|
|
|
|
# Create major dataframe.
|
|
# This can be done incrementally if RAM is constrained by modifying the above code.
|
|
p_df = pl.concat(df_list)
|
|
|
|
# Clean up the reference just in case to help with memory if needed.
|
|
del df_list
|
|
|
|
# Get tweets that are replies to other tweets
|
|
p_df_replies_only = p_df.filter(pl.col("in_reply_to_status_id").is_null().is_not())
|
|
|
|
# Group by replied to status id to see the most replied to statuses. This can take some time.
|
|
p_df_group_reply_to_status = p_df_replies_only.groupby("in_reply_to_status_id").count().sort("count", reverse=True)
|
|
|
|
# Save output of grouping the top replied to statuses
|
|
group_reply_parq = folder_path / "group_reply_parq.parquet"
|
|
p_df_group_reply_to_status.write_parquet(group_reply_parq)
|
|
|
|
# Join the main dataframe with the top replies to find tweets that have replies.
|
|
p_join = p_df.join(p_df_group_reply_to_status, left_on="id", right_on="in_reply_to_status_id", how="inner")
|
|
|
|
# Save output of tweets that have replies
|
|
tweets_that_have_replies_path = folder_path / "tweets_that_have_replies.parquet"
|
|
p_join.write_parquet(tweets_that_have_replies_path)
|
|
|
|
# Save output of tweets that are replies to other tweets
|
|
tweets_that_are_replies_path = folder_path / "tweets_that_are_replies.parquet"
|
|
p_df_replies_only.write_parquet(tweets_that_are_replies_path)
|
|
|
|
# Filter the tweets that have replies to ones that aren't replies to others.
|
|
# Also filter for only english for now.
|
|
# This gives the root tweets that have replies but are the start of a conversation.
|
|
origin_tweets = p_join.filter((pl.col("in_reply_to_status_id").is_null()) & (pl.col("lang") == "en"))
|
|
|
|
|
|
# Helper functions and classes below for the next steps
|
|
|
|
|
|
def role_decide(user_id, prompt_user):
|
|
if user_id == prompt_user:
|
|
return "prompter"
|
|
else:
|
|
return "assistant"
|
|
|
|
|
|
class ConversationTreeNode:
|
|
def __init__(self, tweet_id, prompt_user, from_df, children_df, metadata=None):
|
|
|
|
if metadata:
|
|
self.metadata = metadata
|
|
else:
|
|
self.metadata = from_df.filter(pl.col("id") == tweet_id).to_dicts()[0]
|
|
|
|
self.metadata["prompt_user"] = prompt_user
|
|
self.role = role_decide(self.metadata["user_id"], prompt_user)
|
|
self.children = None
|
|
self.text = self.metadata["text"]
|
|
del self.metadata["text"]
|
|
self.get_children(tweet_id=tweet_id, children_df=children_df)
|
|
|
|
def get_children(self, tweet_id, children_df):
|
|
children_dicts = children_df.filter(pl.col("in_reply_to_status_id") == tweet_id).to_dicts()
|
|
if len(children_dicts) > 0:
|
|
children = [
|
|
ConversationTreeNode(
|
|
tweet_id=c["id"],
|
|
prompt_user=self.metadata["prompt_user"],
|
|
from_df=children_df,
|
|
children_df=children_df,
|
|
metadata=c,
|
|
)
|
|
for c in children_dicts
|
|
]
|
|
self.children = children
|
|
|
|
|
|
class ConversationTree:
|
|
def __init__(self, tweet_id, prompt_user, from_df, children_df, r_metadata=None):
|
|
|
|
self.root = ConversationTreeNode(
|
|
tweet_id=tweet_id, prompt_user=prompt_user, from_df=from_df, children_df=children_df, metadata=r_metadata
|
|
)
|
|
self.metadata = None
|
|
|
|
|
|
# Create conversation trees
|
|
conv_tree_list = [
|
|
ConversationTree(
|
|
tweet_id=r["id"], prompt_user=r["user_id"], from_df=origin_tweets, children_df=p_df_replies_only, r_metadata=r
|
|
)
|
|
for r in tqdm(origin_tweets.to_dicts())
|
|
]
|
|
|
|
# Write conversation trees to jsonl file.
|
|
# Might need to clean up the last newline.
|
|
with open(output_path, "w") as output:
|
|
for t in tqdm(conv_tree_list):
|
|
json.dump(obj=t, fp=output, default=lambda x: x.__dict__)
|
|
output.write("\n")
|