diff --git a/scripts/data-collection/twitter/README.md b/scripts/data-collection/twitter/README.md new file mode 100644 index 00000000..b8c7bfc9 --- /dev/null +++ b/scripts/data-collection/twitter/README.md @@ -0,0 +1,80 @@ +# Twitter data collection for Open Assistant + +Conversations on Twitter can be an interesting and useful source of data for our +model to learn from. Certain twitter threads may contain helpful prompts and +replies, in a similar fashion to how we want our model to be able to respond to +prompts in a useful way. + +Thus, these scripts are intended to process twitter data from a variety of +sources, process them into cleaner and more useful formats, and then combine the +various outputs into a unified training set that can be fed to our model as a +conversation, or at least as a prompt with replies. + +**Note: Based on issue #126** + +## Possible Data Paths + +- Twitterstream archive: https://archive.org/details/twitterstream These are + large .tar files with compressed json files inside. However, some data points + such as reply counts seem to always be 0 due to limitations when scraping the + Twitter API. +- Alternative APIs such as snscrape, twint, etc. These alternative APIs often + are harder to use than the official Twitter API but can often bypass API + limits which can make it useful for larger scale data collection. The downside + is potentially slower speed, and less features. +- The official Twitter API + +## Currently Completed Items + +- Downloaded various archive files (both are .tar, but each have a different + format of json compression. One used .gz, and the other.bz2). Each json file + is roughly 2000 rows of tweets. There are thousands of these compressed json + files. Managing the IO of opening lots of small files is one of the + challenges, which is why future steps will consolidate data into larger easier + to process files. +- Wrote script that can loop through the compressed json files, cleans them up a + bit by removing truncated tweets or tweets that aren't replies. The script + then standardizes the columns, and exports the polars dataframes into parquet + files for future processing. Note: Using polars instead of pandas due to + performance reasons. +- Wrote scripts that process the large dump of tweets into conversation threads + using the tree and node architecture. This results in aroun 17K conversation + threads bassed on a dump of 90M tweets. +- Script can output the conversation threads into a jsonl file for further + filtering or use in models. + +## Main Issue + +- The issue is that we can easily scrape replies, but there is no guarantee the + original tweet is in the archive file. Furthermore, the archives are large so + they would need to be kept completely in-memory or in a db to reference. We + still need to decide if we want to try to mine the archive to piece together + the conversations, or we can take the list of replied tweets and loop through + those and use alternative apis to fetch the original tweet text, and then + match it with the confirmed replies already in our archive to generate the + prompt/replies data. Currently, my script can extract conversations based on + the dump, but it is a small percentage of the overall dump, and there is no + guarantee of the quality of the tweets. +- The tweet quality is the other major issue. We can get conversations through + the currently made scripts, but they most likely don't match a useful + instruction -> fulfilment. We are trying to filter the tweets through various + means such as matching useful hashtags, or by using cosine similarity against + known instructions. +- The modern Twitter API has conversation_id as a field which can be a way to + gather all tweets in a thread sort of automatically although there is + pagination limits. The main issue with this is it seems hard to search for it + using alternative APIs. + +## TODO + +- Write scripts to filter existing conversations into useful instructions -> + fulfilment with hashtags or cosine similarity. +- Train model to detect if text is a suitable instruction. This could then be + run through the conversations (or full tweet dump) to simplify the process. + Related to issue #143. +- Write script that matches the original tweets and their text with the archive + data to create the prompt/reply dataset. (Optional) +- Decide on final output format and storage options for the dataset. Currently + in JSONL with tree / node architecture as python dicts which is acceptable I + believe. +- Alternatively: Store processed tweets into DB or alternative option.(Optional) diff --git a/scripts/data-collection/twitter/requirements.txt b/scripts/data-collection/twitter/requirements.txt new file mode 100644 index 00000000..2b084674 --- /dev/null +++ b/scripts/data-collection/twitter/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.21.5 +polars==0.15.14 +tqdm==4.64.0 diff --git a/scripts/data-collection/twitter/twitter_create_convs.py b/scripts/data-collection/twitter/twitter_create_convs.py new file mode 100644 index 00000000..b1fd709a --- /dev/null +++ b/scripts/data-collection/twitter/twitter_create_convs.py @@ -0,0 +1,141 @@ +import json +from pathlib import Path + +import polars as pl +from tqdm import tqdm + +# Sets up paths +# TODO: Source paths from env file +path_string = "PUT THE PATH HERE TO WHERE YOU STORED THE PARQUET FILES" +folder_path = Path(path_string) +processed_folder_path = folder_path / "processed" +output_path = folder_path / "twitter-conv-trees.jsonl" + +# Get parq files +parq_files = sorted(processed_folder_path.rglob("*.parquet")) + +wanted_cols = [ + "timestamp_ms", + "id", + "text", + "truncated", + "in_reply_to_status_id", + "in_reply_to_user_id", + "is_quote_status", + "quote_count", + "reply_count", + "retweet_count", + "favorite_count", + "filter_level", + "lang", + "possibly_sensitive", + "hashtags", + "user_id", + "user_verified", + "user_followers_count", + "user_statuses_count", +] + +# Load parqs into list. Using Polars for performance reasons. +df_list = [] +for p in parq_files: + df_list.append(pl.read_parquet(p, columns=wanted_cols)) + +# Create major dataframe. +# This can be done incrementally if RAM is constrained by modifying the above code. +p_df = pl.concat(df_list) + +# Clean up the reference just in case to help with memory if needed. +del df_list + +# Get tweets that are replies to other tweets +p_df_replies_only = p_df.filter(pl.col("in_reply_to_status_id").is_null().is_not()) + +# Group by replied to status id to see the most replied to statuses. This can take some time. +p_df_group_reply_to_status = p_df_replies_only.groupby("in_reply_to_status_id").count().sort("count", reverse=True) + +# Save output of grouping the top replied to statuses +group_reply_parq = folder_path / "group_reply_parq.parquet" +p_df_group_reply_to_status.write_parquet(group_reply_parq) + +# Join the main dataframe with the top replies to find tweets that have replies. +p_join = p_df.join(p_df_group_reply_to_status, left_on="id", right_on="in_reply_to_status_id", how="inner") + +# Save output of tweets that have replies +tweets_that_have_replies_path = folder_path / "tweets_that_have_replies.parquet" +p_join.write_parquet(tweets_that_have_replies_path) + +# Save output of tweets that are replies to other tweets +tweets_that_are_replies_path = folder_path / "tweets_that_are_replies.parquet" +p_df_replies_only.write_parquet(tweets_that_are_replies_path) + +# Filter the tweets that have replies to ones that aren't replies to others. +# Also filter for only english for now. +# This gives the root tweets that have replies but are the start of a conversation. +origin_tweets = p_join.filter((pl.col("in_reply_to_status_id").is_null()) & (pl.col("lang") == "en")) + + +# Helper functions and classes below for the next steps + + +def role_decide(user_id, prompt_user): + if user_id == prompt_user: + return "prompter" + else: + return "assistant" + + +class ConversationTreeNode: + def __init__(self, tweet_id, prompt_user, from_df, children_df, metadata=None): + + if metadata: + self.metadata = metadata + else: + self.metadata = from_df.filter(pl.col("id") == tweet_id).to_dicts()[0] + + self.metadata["prompt_user"] = prompt_user + self.role = role_decide(self.metadata["user_id"], prompt_user) + self.children = None + self.text = self.metadata["text"] + del self.metadata["text"] + self.get_children(tweet_id=tweet_id, children_df=children_df) + + def get_children(self, tweet_id, children_df): + children_dicts = children_df.filter(pl.col("in_reply_to_status_id") == tweet_id).to_dicts() + if len(children_dicts) > 0: + children = [ + ConversationTreeNode( + tweet_id=c["id"], + prompt_user=self.metadata["prompt_user"], + from_df=children_df, + children_df=children_df, + metadata=c, + ) + for c in children_dicts + ] + self.children = children + + +class ConversationTree: + def __init__(self, tweet_id, prompt_user, from_df, children_df, r_metadata=None): + + self.root = ConversationTreeNode( + tweet_id=tweet_id, prompt_user=prompt_user, from_df=from_df, children_df=children_df, metadata=r_metadata + ) + self.metadata = None + + +# Create conversation trees +conv_tree_list = [ + ConversationTree( + tweet_id=r["id"], prompt_user=r["user_id"], from_df=origin_tweets, children_df=p_df_replies_only, r_metadata=r + ) + for r in tqdm(origin_tweets.to_dicts()) +] + +# Write conversation trees to jsonl file. +# Might need to clean up the last newline. +with open(output_path, "w") as output: + for t in tqdm(conv_tree_list): + json.dump(obj=t, fp=output, default=lambda x: x.__dict__) + output.write("\n") diff --git a/scripts/data-collection/twitter/twitter_process_json.py b/scripts/data-collection/twitter/twitter_process_json.py new file mode 100644 index 00000000..a1e47d7d --- /dev/null +++ b/scripts/data-collection/twitter/twitter_process_json.py @@ -0,0 +1,233 @@ +# This file loops through compressed json tweet data, pre-processes them, +# and then extracts them into more unified parquet files that can be handed +# off for further processing. The main focus is on producing viable replies. + +# Initial data exploration seems that there is no guarantee that the original +# tweets are in the archive, so we might need to extract suitable replies +# then get the original tweets separately, and then combine them into a +# suitable thread format that can be used by our instruction model. + +# This assumes data downloaded from https://archive.org/details/twitterstream +# and that the internal .tar files are extracted locally. +# They are large files so using something like 7Zip or WinRar migth be easier +# than putting all of it in scripts, but it is a possibility. + +# I often work in notebooks. If you encounter any issue, please reach out to let me know. + +import bz2 +import gzip +import json +import pickle +from pathlib import Path + +import numpy as np +import polars as pl +from tqdm import tqdm + +# TODO: OPTIONAL - Put the Untar process in a script instead of doing that part externally. Twitterstream archives are .tar with folders and json.gz files inside. +# TODO: Set up list of important hashtags & keywords. This might have to be done after we get the original tweets in a separate file. +# TODO: Process data and filter based on hashtags & keywords + +# Sets up paths +# TODO: Source paths from env file +path_string = "PUT THE PATH HERE TO WHERE YOU DOWNLOADED AND EXTRACTED THE ARCHIVE .TAR" +folder_path = Path(path_string) +file_list_pkl = folder_path / "file_list.pkl" +processed_file_list_pkl = folder_path / "processed_file_list.pkl" + +# For the processed folder to save inside, we can create the directory if it doesn't exist +processed_folder_path = folder_path / "processed" +processed_folder_path.mkdir(parents=True, exist_ok=True) + +# Set max buffer to store temporary dataframes for processing +# Change this depending on the memory of your computer +processed_max_buffer = 5000 + +# Set up list of wanted column names. +# Note: User columns are prefixed with user_ +wanted_cols = [ + "timestamp_ms", + "id", + "text", + "truncated", + "in_reply_to_status_id", + "in_reply_to_user_id", + "is_quote_status", + "quote_count", + "reply_count", + "retweet_count", + "favorite_count", + "filter_level", + "lang", + "possibly_sensitive", + "hashtags", + "user_id", + "user_verified", + "user_followers_count", + "user_statuses_count", +] + + +def main(file_list_pkl, folder_path, processed_max_buffer): + """ + Runs the main processing script to get files, loop through them, and process them. + Outputs larger json.gz files made by concat the pre-filtered dataframes from + the original json.gz files. + """ + + file_list = get_file_paths(file_list_pkl, folder_path) + + process_json(file_list, processed_max_buffer) + + print("Done") + + +def get_file_paths(file_list_pkl, folder_path): + """ + Gets the file paths by recursively checking the folder structure. + # Based on code from stackoverflow https://stackoverflow.com/questions/26835477/pickle-load-variable-if-exists-or-create-and-save-it + """ + try: + allpaths = pickle.load(open(file_list_pkl, "rb")) + except (OSError, IOError) as e: + print(e) + allpaths = sorted(list(folder_path.rglob("*.[gz bz2]*"))) + pickle.dump(allpaths, open(file_list_pkl, "wb")) + print("Got file paths.") + return allpaths + + +def get_processed_list(processed_file_list_pkl): + # Gets processed file list if stored, if not, creates it. + try: + processed_list = pickle.load(open(processed_file_list_pkl, "rb")) + except (OSError, IOError) as e: + print(e) + processed_list = [] + pickle.dump(processed_list, open(processed_file_list_pkl, "wb")) + return processed_list + + +def modify_dict_cols(j_dict): + # Extracting some nested json + j_dict["user_id"] = np.int64(j_dict["user"]["id"]) + j_dict["user_followers_count"] = np.int64(j_dict["user"]["followers_count"]) + j_dict["user_statuses_count"] = np.int64(j_dict["user"]["statuses_count"]) + + # Get hashtags as a list of strings + j_dict["hashtags"] = [h["text"] for h in j_dict["entities"]["hashtags"]] + + j_dict["id"] = np.int64(j_dict["id"]) + + try: + j_dict["in_reply_to_status_id"] = np.int64(j_dict["in_reply_to_status_id"]) + except Exception as e: + print(e) + j_dict["in_reply_to_status_id"] = j_dict["in_reply_to_status_id"] + + try: + j_dict["in_reply_to_user_id"] = np.int64(j_dict["in_reply_to_user_id"]) + except Exception as e: + print(e) + j_dict["in_reply_to_user_id"] = j_dict["in_reply_to_user_id"] + + # Make sure relevant columns are available or none. + for key in wanted_cols: + if key not in j_dict: + j_dict[key] = None + + # Ordering keys and taking wanted columns + j_dict = {key: j_dict[key] for key in wanted_cols} + + return j_dict + + +def process_single_file(f, processed_list): + + j_dict_list = [] + if f not in processed_list: + # Check for compression type + if f.suffix == ".bz2": + with bz2.BZ2File(f) as file: + for line in file: + # Load JSON + j_dict = json.loads(line) + # Check if user key exists + if "delete" not in j_dict: + if j_dict["truncated"] is False: + + j_dict = modify_dict_cols(j_dict) + + j_dict_list.append(j_dict) + + else: + with gzip.open(f, "r") as file: + for line in file: + # Load JSON + j_dict = json.loads(line) + # Check if user key exists + if "delete" not in j_dict: + if j_dict["truncated"] is False: + + j_dict = modify_dict_cols(j_dict) + + j_dict_list.append(j_dict) + + return j_dict_list + + +def process_json(file_list, processed_max_buffer): + """ + Loops through file list and loads the compressed + json into a list of dicts after some pre-processing. + + Makes sure dicts are ordered in a specific + way to make sure polars can read them. + """ + + # Gets processed file list if stored, if not, creates it. + processed_list = get_processed_list(processed_file_list_pkl) + + j_list = [] + temp_processed_files = [] + + for i, f in enumerate(tqdm(file_list)): + + j_dict_list = process_single_file(f, processed_list) + + j_list.extend(j_dict_list) + + temp_processed_files.append(f) + + if len(temp_processed_files) == processed_max_buffer: + # If we reach our buffer, + # combine into polars dataframe + # and write to parquet as + # a checkpoint + processed_file_name = f"processed_json_{i}.parquet" + processed_file_path = processed_folder_path / processed_file_name + + pl.DataFrame(j_list, columns=wanted_cols).write_parquet(processed_file_path) + + # Make note of which files have been processed + processed_list.extend(temp_processed_files) + pickle.dump(processed_list, open(processed_file_list_pkl, "wb")) + + # Reset buffer lists + j_list = [] + temp_processed_files = [] + + # Process remaining files + processed_file_name = f"processed_json_{i}.parquet" + processed_file_path = processed_folder_path / processed_file_name + pl.from_dicts(j_dict_list).write_parquet(processed_file_path) + processed_list.extend(temp_processed_files) + pickle.dump(processed_list, open(processed_file_list_pkl, "wb")) + j_dict_list = [] + temp_processed_files = [] + + print("Processing completed") + + +if __name__ == "__main__": + main(file_list_pkl, folder_path, processed_max_buffer)