# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.3.0
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.11.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.11.0 # All requested packages already installed. Note: you may need to restart the kernel to use updated packages. Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0) Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-fef7c7d6-71b0-4951-a075-e07aedf2adcf;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 369ms :: artifacts dl 20ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-fef7c7d6-71b0-4951-a075-e07aedf2adcf confs: [default] 0 artifacts copied, 2 already retrieved (0kB/13ms)
23/12/08 22:40:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/08 22:40:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 23/12/08 22:40:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 23/12/08 22:40:02 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043. 3.3.0
bucket = 'dsan-6000-group-35'
submissions = spark.read.parquet(f"s3a://{bucket}/submissions_with_sentiment.parquet")
23/12/08 22:40:06 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
from pyspark.sql import functions as f
submissions = submissions.filter(f.col('subreddit')=='TaylorSwift')
submissions.printSchema()
root |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- subreddit: string (nullable = true) |-- id: string (nullable = true) |-- author: string (nullable = true) |-- created_utc: timestamp (nullable = true) |-- text: string (nullable = true) |-- num_comments: long (nullable = true) |-- num_crossposts: long (nullable = true) |-- score: long (nullable = true) |-- is_self: boolean (nullable = true) |-- stickied: boolean (nullable = true) |-- sentiment: string (nullable = true)
submissions = submissions.withColumn("is_viral",((f.col("num_comments") > 100)|(f.col("num_crossposts")>10)|(f.col("score")>500)))
submissions.printSchema()
root |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- subreddit: string (nullable = true) |-- id: string (nullable = true) |-- author: string (nullable = true) |-- created_utc: timestamp (nullable = true) |-- text: string (nullable = true) |-- num_comments: long (nullable = true) |-- num_crossposts: long (nullable = true) |-- score: long (nullable = true) |-- is_self: boolean (nullable = true) |-- stickied: boolean (nullable = true) |-- sentiment: string (nullable = true) |-- is_viral: boolean (nullable = true)
from pyspark.sql.functions import col, length
submissions = submissions.withColumn("is_self", col("is_self").cast("string"))
submissions = submissions.withColumn("stickied", col("stickied").cast("string"))
submissions = submissions.withColumn("is_viral", col("is_viral").cast("string"))
submissions = submissions.withColumn('text_length',f.length('text'))
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,RegexTokenizer,StopWordsRemover,VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.regression import DecisionTreeRegressor,RandomForestRegressor,GBTRegressor,GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
stringIndexer_sentiment = StringIndexer(inputCol="sentiment", outputCol="sentiment_idx")
stringIndexer_is_self = StringIndexer(inputCol="is_self", outputCol="is_self_idx")
stringIndexer_stickied = StringIndexer(inputCol="stickied", outputCol="stickied_idx")
# Define OneHotEncoder stages
onehot_sentiment = OneHotEncoder(inputCol="sentiment_idx", outputCol="sentiment_vec")
onehot_is_self = OneHotEncoder(inputCol="is_self_idx", outputCol="is_self_vec")
onehot_stickied = OneHotEncoder(inputCol="stickied_idx", outputCol="stickied_vec")
submissions.printSchema()
root |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- subreddit: string (nullable = true) |-- id: string (nullable = true) |-- author: string (nullable = true) |-- created_utc: timestamp (nullable = true) |-- text: string (nullable = true) |-- num_comments: long (nullable = true) |-- num_crossposts: long (nullable = true) |-- score: long (nullable = true) |-- is_self: string (nullable = true) |-- stickied: string (nullable = true) |-- sentiment: string (nullable = true) |-- is_viral: string (nullable = true) |-- text_length: integer (nullable = true)
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
assembler = VectorAssembler(inputCols=["year", "month", "num_comments", "num_crossposts", "score",
"is_self_vec", "stickied_vec", "sentiment_vec", "text_length"], outputCol="features")
# Define label indexer
labelIndexer = StringIndexer(inputCol="is_viral", outputCol="label")
# Initialize models
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
lr = LogisticRegression(labelCol="label", featuresCol="features")
# Create pipelines with all stages
pipeline_rf = Pipeline(stages=[stringIndexer_sentiment,
stringIndexer_is_self,
stringIndexer_stickied,
onehot_sentiment,
onehot_is_self,
onehot_stickied,
assembler,
labelIndexer,
rf])
pipeline_lr = Pipeline(stages=[stringIndexer_sentiment,
stringIndexer_is_self,
stringIndexer_stickied,
onehot_sentiment,
onehot_is_self,
onehot_stickied,
assembler,
labelIndexer, lr])
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# Split the data
train, test = submissions.randomSplit([0.8, 0.2],123)
# Fit models
model_rf = pipeline_rf.fit(train)
model_lr = pipeline_lr.fit(train)
# Predict
rf_pred = model_rf.transform(test)
lr_pred= model_lr.transform(test)
# Evaluators
evaluator_roc = BinaryClassificationEvaluator(metricName="areaUnderROC")
evaluator_f1 = MulticlassClassificationEvaluator(metricName="f1")
# Evaluate models
roc_rf = evaluator_roc.evaluate(rf_pred)
roc_lr = evaluator_roc.evaluate(lr_pred)
f1_rf = evaluator_f1.evaluate(rf_pred)
f1_lr = evaluator_f1.evaluate(lr_pred)
# Print results
print(f"Random Forest: ROC AUC = {roc_rf}, F1-score = {f1_rf}")
print(f"Logistic Regression: ROC AUC = {roc_lr}, F1-score = {f1_lr}")
[Stage 40:> (0 + 2) / 2]
23/12/08 22:42:05 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS 23/12/08 22:42:05 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
[Stage 99:> (0 + 2) / 2]
Random Forest: ROC AUC = 0.9970112332703939, F1-score = 0.9836765821487745 Logistic Regression: ROC AUC = 0.9985240442435646, F1-score = 0.9837274763112296
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark import SparkContext
#label = rf_pred.select("label")
#pred = rf_pred.select("prediction")
pred_and_label = rf_pred.select(['prediction','label'])
metrics = MulticlassMetrics(pred_and_label.rdd.map(tuple))
/opt/conda/lib/python3.10/site-packages/pyspark/sql/context.py:157: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead. warnings.warn(
confusion_matrix_rf = metrics.confusionMatrix().toArray()
from numpy import savetxt
savetxt("confusion_matrix_rf.csv",confusion_matrix_rf,delimiter=',')
pred_and_label = lr_pred.select(['prediction','label'])
metrics = MulticlassMetrics(pred_and_label.rdd.map(tuple))
confusion_matrix_lr = metrics.confusionMatrix().toArray()
savetxt("confusion_matrix_lr.csv",confusion_matrix_lr,delimiter=',')
/opt/conda/lib/python3.10/site-packages/pyspark/sql/context.py:157: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead. warnings.warn(
import pandas as pd
import matplotlib.pyplot as plt
model_names = ["Random Forest", "Logistic Regression"]
roc_values = [roc_rf, roc_lr]
f1_values = [f1_rf, f1_lr]
evaluation_table = pd.DataFrame({
"Model": ["Random Forest", "Logistic Regression"],
"ROC AUC": [roc_rf, roc_lr],
"F1-score": [f1_rf, f1_lr]
})
evaluation_table
Model | ROC AUC | F1-score | |
---|---|---|---|
0 | Random Forest | 0.997639 | 0.987101 |
1 | Logistic Regression | 0.998537 | 0.983676 |
evaluation_table.to_csv('mx109_summary_table.csv')
# Creating the bar chart
fig, ax = plt.subplots(figsize=(10, 6))
evaluation_table.plot(kind='bar', x='Model', y=['ROC AUC', 'F1-score'], ax=ax)
plt.title('Model Evaluation Metrics')
plt.ylabel('Scores')
plt.xticks(rotation=0)
plt.grid(True)
plt.savefig('mx109_model_evaluation.png')
# Displaying the table and the chart
plt.show()
We recently made an effort to examine the popularity of Taylor Swift-related content on Reddit, and the results were impressive. Using advanced machine learning tools, our team is able to decode the elements that help certain posts go viral. The process involves carefully defining what makes a post viral—taking into account number of comments, score, etc.—and using models that can predict which posts are likely to go viral. The results are remarkable. For example, random forest models achieve near-perfect scores on standard evaluation metrics, and logistic regression models are not far behind. This revolutionizes how we plan content, providing a scientific approach to identifying and promoting posts that are likely to be popular on Reddit.
The project started by setting up a PySpark environment to efficiently manage large data sets. We selected Taylor Swift-related posts on Reddit and organized the data for better analysis. Important metrics such as number of comments, number of shares, total score, and post length were evaluated. We also take into account factors such as post sentiment. These variables are prepared for the machine learning process through specific techniques such as StringIndexing and OneHotEncoding.
We chose two models: random forest and logistic regression because of their ability to handle different types of data. After dividing the data into training and testing parts, we train the model and then evaluate its performance. The evaluation focuses on ROC AUC and F1 score, which are common measures for this type of task. The near-perfect score of the random forest model demonstrates its superior ability to differentiate between viral and non-viral posts. The logistic regression model also showed impressive results, proving its value in predicting viral content.
The success of our model demonstrates a strong connection between certain characteristics, such as sentiment and engagement metrics, and the viral potential of a post. This insight is invaluable for posts/authors aiming to go viral on platforms like Reddit. Visual tools are key to sharing our findings. Graphs showing the most influential features in the random forest model along with the performance curves of the two models help make our complex analysis easy to understand, especially for those unfamiliar with the technical details.
The project laid the foundation for ongoing analysis. The flexibility of our models means they can be updated with new data, keeping our approach to identifying viral content at the forefront of digital strategy.