126 twitter data (#620)

* Added a script file to process json archive files into more unified parquet files focused on tweet reply rows for further processing.

* Added README file for Twitter data collection.

* Re did code for processing json into standardized parquet files.

* Added file to process parquet files into a conversation tree jsonl file.

* Added requirements and ran pre-commit.
This commit is contained in:
James Mete
2023-01-21 15:54:21 +03:00
committed by GitHub
parent 5f3f0776e9
commit d15d8357a1
4 changed files with 457 additions and 0 deletions
+80
View File
@@ -0,0 +1,80 @@
# Twitter data collection for Open Assistant
Conversations on Twitter can be an interesting and useful source of data for our
model to learn from. Certain twitter threads may contain helpful prompts and
replies, in a similar fashion to how we want our model to be able to respond to
prompts in a useful way.
Thus, these scripts are intended to process twitter data from a variety of
sources, process them into cleaner and more useful formats, and then combine the
various outputs into a unified training set that can be fed to our model as a
conversation, or at least as a prompt with replies.
**Note: Based on issue #126**
## Possible Data Paths
- Twitterstream archive: https://archive.org/details/twitterstream These are
large .tar files with compressed json files inside. However, some data points
such as reply counts seem to always be 0 due to limitations when scraping the
Twitter API.
- Alternative APIs such as snscrape, twint, etc. These alternative APIs often
are harder to use than the official Twitter API but can often bypass API
limits which can make it useful for larger scale data collection. The downside
is potentially slower speed, and less features.
- The official Twitter API
## Currently Completed Items
- Downloaded various archive files (both are .tar, but each have a different
format of json compression. One used .gz, and the other.bz2). Each json file
is roughly 2000 rows of tweets. There are thousands of these compressed json
files. Managing the IO of opening lots of small files is one of the
challenges, which is why future steps will consolidate data into larger easier
to process files.
- Wrote script that can loop through the compressed json files, cleans them up a
bit by removing truncated tweets or tweets that aren't replies. The script
then standardizes the columns, and exports the polars dataframes into parquet
files for future processing. Note: Using polars instead of pandas due to
performance reasons.
- Wrote scripts that process the large dump of tweets into conversation threads
using the tree and node architecture. This results in aroun 17K conversation
threads bassed on a dump of 90M tweets.
- Script can output the conversation threads into a jsonl file for further
filtering or use in models.
## Main Issue
- The issue is that we can easily scrape replies, but there is no guarantee the
original tweet is in the archive file. Furthermore, the archives are large so
they would need to be kept completely in-memory or in a db to reference. We
still need to decide if we want to try to mine the archive to piece together
the conversations, or we can take the list of replied tweets and loop through
those and use alternative apis to fetch the original tweet text, and then
match it with the confirmed replies already in our archive to generate the
prompt/replies data. Currently, my script can extract conversations based on
the dump, but it is a small percentage of the overall dump, and there is no
guarantee of the quality of the tweets.
- The tweet quality is the other major issue. We can get conversations through
the currently made scripts, but they most likely don't match a useful
instruction -> fulfilment. We are trying to filter the tweets through various
means such as matching useful hashtags, or by using cosine similarity against
known instructions.
- The modern Twitter API has conversation_id as a field which can be a way to
gather all tweets in a thread sort of automatically although there is
pagination limits. The main issue with this is it seems hard to search for it
using alternative APIs.
## TODO
- Write scripts to filter existing conversations into useful instructions ->
fulfilment with hashtags or cosine similarity.
- Train model to detect if text is a suitable instruction. This could then be
run through the conversations (or full tweet dump) to simplify the process.
Related to issue #143.
- Write script that matches the original tweets and their text with the archive
data to create the prompt/reply dataset. (Optional)
- Decide on final output format and storage options for the dataset. Currently
in JSONL with tree / node architecture as python dicts which is acceptable I
believe.
- Alternatively: Store processed tweets into DB or alternative option.(Optional)
@@ -0,0 +1,3 @@
numpy==1.21.5
polars==0.15.14
tqdm==4.64.0
@@ -0,0 +1,141 @@
import json
from pathlib import Path
import polars as pl
from tqdm import tqdm
# Sets up paths
# TODO: Source paths from env file
path_string = "PUT THE PATH HERE TO WHERE YOU STORED THE PARQUET FILES"
folder_path = Path(path_string)
processed_folder_path = folder_path / "processed"
output_path = folder_path / "twitter-conv-trees.jsonl"
# Get parq files
parq_files = sorted(processed_folder_path.rglob("*.parquet"))
wanted_cols = [
"timestamp_ms",
"id",
"text",
"truncated",
"in_reply_to_status_id",
"in_reply_to_user_id",
"is_quote_status",
"quote_count",
"reply_count",
"retweet_count",
"favorite_count",
"filter_level",
"lang",
"possibly_sensitive",
"hashtags",
"user_id",
"user_verified",
"user_followers_count",
"user_statuses_count",
]
# Load parqs into list. Using Polars for performance reasons.
df_list = []
for p in parq_files:
df_list.append(pl.read_parquet(p, columns=wanted_cols))
# Create major dataframe.
# This can be done incrementally if RAM is constrained by modifying the above code.
p_df = pl.concat(df_list)
# Clean up the reference just in case to help with memory if needed.
del df_list
# Get tweets that are replies to other tweets
p_df_replies_only = p_df.filter(pl.col("in_reply_to_status_id").is_null().is_not())
# Group by replied to status id to see the most replied to statuses. This can take some time.
p_df_group_reply_to_status = p_df_replies_only.groupby("in_reply_to_status_id").count().sort("count", reverse=True)
# Save output of grouping the top replied to statuses
group_reply_parq = folder_path / "group_reply_parq.parquet"
p_df_group_reply_to_status.write_parquet(group_reply_parq)
# Join the main dataframe with the top replies to find tweets that have replies.
p_join = p_df.join(p_df_group_reply_to_status, left_on="id", right_on="in_reply_to_status_id", how="inner")
# Save output of tweets that have replies
tweets_that_have_replies_path = folder_path / "tweets_that_have_replies.parquet"
p_join.write_parquet(tweets_that_have_replies_path)
# Save output of tweets that are replies to other tweets
tweets_that_are_replies_path = folder_path / "tweets_that_are_replies.parquet"
p_df_replies_only.write_parquet(tweets_that_are_replies_path)
# Filter the tweets that have replies to ones that aren't replies to others.
# Also filter for only english for now.
# This gives the root tweets that have replies but are the start of a conversation.
origin_tweets = p_join.filter((pl.col("in_reply_to_status_id").is_null()) & (pl.col("lang") == "en"))
# Helper functions and classes below for the next steps
def role_decide(user_id, prompt_user):
if user_id == prompt_user:
return "prompter"
else:
return "assistant"
class ConversationTreeNode:
def __init__(self, tweet_id, prompt_user, from_df, children_df, metadata=None):
if metadata:
self.metadata = metadata
else:
self.metadata = from_df.filter(pl.col("id") == tweet_id).to_dicts()[0]
self.metadata["prompt_user"] = prompt_user
self.role = role_decide(self.metadata["user_id"], prompt_user)
self.children = None
self.text = self.metadata["text"]
del self.metadata["text"]
self.get_children(tweet_id=tweet_id, children_df=children_df)
def get_children(self, tweet_id, children_df):
children_dicts = children_df.filter(pl.col("in_reply_to_status_id") == tweet_id).to_dicts()
if len(children_dicts) > 0:
children = [
ConversationTreeNode(
tweet_id=c["id"],
prompt_user=self.metadata["prompt_user"],
from_df=children_df,
children_df=children_df,
metadata=c,
)
for c in children_dicts
]
self.children = children
class ConversationTree:
def __init__(self, tweet_id, prompt_user, from_df, children_df, r_metadata=None):
self.root = ConversationTreeNode(
tweet_id=tweet_id, prompt_user=prompt_user, from_df=from_df, children_df=children_df, metadata=r_metadata
)
self.metadata = None
# Create conversation trees
conv_tree_list = [
ConversationTree(
tweet_id=r["id"], prompt_user=r["user_id"], from_df=origin_tweets, children_df=p_df_replies_only, r_metadata=r
)
for r in tqdm(origin_tweets.to_dicts())
]
# Write conversation trees to jsonl file.
# Might need to clean up the last newline.
with open(output_path, "w") as output:
for t in tqdm(conv_tree_list):
json.dump(obj=t, fp=output, default=lambda x: x.__dict__)
output.write("\n")
@@ -0,0 +1,233 @@
# This file loops through compressed json tweet data, pre-processes them,
# and then extracts them into more unified parquet files that can be handed
# off for further processing. The main focus is on producing viable replies.
# Initial data exploration seems that there is no guarantee that the original
# tweets are in the archive, so we might need to extract suitable replies
# then get the original tweets separately, and then combine them into a
# suitable thread format that can be used by our instruction model.
# This assumes data downloaded from https://archive.org/details/twitterstream
# and that the internal .tar files are extracted locally.
# They are large files so using something like 7Zip or WinRar migth be easier
# than putting all of it in scripts, but it is a possibility.
# I often work in notebooks. If you encounter any issue, please reach out to let me know.
import bz2
import gzip
import json
import pickle
from pathlib import Path
import numpy as np
import polars as pl
from tqdm import tqdm
# TODO: OPTIONAL - Put the Untar process in a script instead of doing that part externally. Twitterstream archives are .tar with folders and json.gz files inside.
# TODO: Set up list of important hashtags & keywords. This might have to be done after we get the original tweets in a separate file.
# TODO: Process data and filter based on hashtags & keywords
# Sets up paths
# TODO: Source paths from env file
path_string = "PUT THE PATH HERE TO WHERE YOU DOWNLOADED AND EXTRACTED THE ARCHIVE .TAR"
folder_path = Path(path_string)
file_list_pkl = folder_path / "file_list.pkl"
processed_file_list_pkl = folder_path / "processed_file_list.pkl"
# For the processed folder to save inside, we can create the directory if it doesn't exist
processed_folder_path = folder_path / "processed"
processed_folder_path.mkdir(parents=True, exist_ok=True)
# Set max buffer to store temporary dataframes for processing
# Change this depending on the memory of your computer
processed_max_buffer = 5000
# Set up list of wanted column names.
# Note: User columns are prefixed with user_
wanted_cols = [
"timestamp_ms",
"id",
"text",
"truncated",
"in_reply_to_status_id",
"in_reply_to_user_id",
"is_quote_status",
"quote_count",
"reply_count",
"retweet_count",
"favorite_count",
"filter_level",
"lang",
"possibly_sensitive",
"hashtags",
"user_id",
"user_verified",
"user_followers_count",
"user_statuses_count",
]
def main(file_list_pkl, folder_path, processed_max_buffer):
"""
Runs the main processing script to get files, loop through them, and process them.
Outputs larger json.gz files made by concat the pre-filtered dataframes from
the original json.gz files.
"""
file_list = get_file_paths(file_list_pkl, folder_path)
process_json(file_list, processed_max_buffer)
print("Done")
def get_file_paths(file_list_pkl, folder_path):
"""
Gets the file paths by recursively checking the folder structure.
# Based on code from stackoverflow https://stackoverflow.com/questions/26835477/pickle-load-variable-if-exists-or-create-and-save-it
"""
try:
allpaths = pickle.load(open(file_list_pkl, "rb"))
except (OSError, IOError) as e:
print(e)
allpaths = sorted(list(folder_path.rglob("*.[gz bz2]*")))
pickle.dump(allpaths, open(file_list_pkl, "wb"))
print("Got file paths.")
return allpaths
def get_processed_list(processed_file_list_pkl):
# Gets processed file list if stored, if not, creates it.
try:
processed_list = pickle.load(open(processed_file_list_pkl, "rb"))
except (OSError, IOError) as e:
print(e)
processed_list = []
pickle.dump(processed_list, open(processed_file_list_pkl, "wb"))
return processed_list
def modify_dict_cols(j_dict):
# Extracting some nested json
j_dict["user_id"] = np.int64(j_dict["user"]["id"])
j_dict["user_followers_count"] = np.int64(j_dict["user"]["followers_count"])
j_dict["user_statuses_count"] = np.int64(j_dict["user"]["statuses_count"])
# Get hashtags as a list of strings
j_dict["hashtags"] = [h["text"] for h in j_dict["entities"]["hashtags"]]
j_dict["id"] = np.int64(j_dict["id"])
try:
j_dict["in_reply_to_status_id"] = np.int64(j_dict["in_reply_to_status_id"])
except Exception as e:
print(e)
j_dict["in_reply_to_status_id"] = j_dict["in_reply_to_status_id"]
try:
j_dict["in_reply_to_user_id"] = np.int64(j_dict["in_reply_to_user_id"])
except Exception as e:
print(e)
j_dict["in_reply_to_user_id"] = j_dict["in_reply_to_user_id"]
# Make sure relevant columns are available or none.
for key in wanted_cols:
if key not in j_dict:
j_dict[key] = None
# Ordering keys and taking wanted columns
j_dict = {key: j_dict[key] for key in wanted_cols}
return j_dict
def process_single_file(f, processed_list):
j_dict_list = []
if f not in processed_list:
# Check for compression type
if f.suffix == ".bz2":
with bz2.BZ2File(f) as file:
for line in file:
# Load JSON
j_dict = json.loads(line)
# Check if user key exists
if "delete" not in j_dict:
if j_dict["truncated"] is False:
j_dict = modify_dict_cols(j_dict)
j_dict_list.append(j_dict)
else:
with gzip.open(f, "r") as file:
for line in file:
# Load JSON
j_dict = json.loads(line)
# Check if user key exists
if "delete" not in j_dict:
if j_dict["truncated"] is False:
j_dict = modify_dict_cols(j_dict)
j_dict_list.append(j_dict)
return j_dict_list
def process_json(file_list, processed_max_buffer):
"""
Loops through file list and loads the compressed
json into a list of dicts after some pre-processing.
Makes sure dicts are ordered in a specific
way to make sure polars can read them.
"""
# Gets processed file list if stored, if not, creates it.
processed_list = get_processed_list(processed_file_list_pkl)
j_list = []
temp_processed_files = []
for i, f in enumerate(tqdm(file_list)):
j_dict_list = process_single_file(f, processed_list)
j_list.extend(j_dict_list)
temp_processed_files.append(f)
if len(temp_processed_files) == processed_max_buffer:
# If we reach our buffer,
# combine into polars dataframe
# and write to parquet as
# a checkpoint
processed_file_name = f"processed_json_{i}.parquet"
processed_file_path = processed_folder_path / processed_file_name
pl.DataFrame(j_list, columns=wanted_cols).write_parquet(processed_file_path)
# Make note of which files have been processed
processed_list.extend(temp_processed_files)
pickle.dump(processed_list, open(processed_file_list_pkl, "wb"))
# Reset buffer lists
j_list = []
temp_processed_files = []
# Process remaining files
processed_file_name = f"processed_json_{i}.parquet"
processed_file_path = processed_folder_path / processed_file_name
pl.from_dicts(j_dict_list).write_parquet(processed_file_path)
processed_list.extend(temp_processed_files)
pickle.dump(processed_list, open(processed_file_list_pkl, "wb"))
j_dict_list = []
temp_processed_files = []
print("Processing completed")
if __name__ == "__main__":
main(file_list_pkl, folder_path, processed_max_buffer)