๐ŸŸก Building a Full-Scale NLP Pipeline for Effective Text Classification

Building a Full-Scale NLP Pipeline for Effective Text Classification

Objective

Build a complete Natural Language Processing (NLP) pipeline for text classification, learning both traditional and modern deep learning approaches. Youโ€™ll convert text data into numerical representations using embeddings and implement classification models to categorize text effectively.


Learning Outcomes

By completing this project, you will:

  • Master fundamental NLP concepts and text processing techniques
  • Understand and implement different text embedding methods
  • Build text classification models using both traditional ML and deep learning
  • Evaluate and optimize NLP model performance
  • Gain practical experience with industry-standard NLP tools and libraries

Skills Gained

  • Building end-to-end NLP pipelines
  • Implementing modern text embedding techniques
  • Creating text classification systems
  • Using state-of-the-art NLP libraries and models
  • Evaluating and optimizing NLP models
  • Handling real-world text data challenges

Tools Required

# Core libraries
pip install transformers
pip install torch torchvision
pip install scikit-learn
pip install nltk
pip install gensim

# Text processing
pip install spacy
pip install textblob

# Optional: for OpenAI embeddings
pip install openai

Project Structure

text_classification/
โ”‚
โ”œโ”€โ”€ data/
โ”‚   โ”œโ”€โ”€ raw/
โ”‚   โ”‚   โ”œโ”€โ”€ train.csv
โ”‚   โ”‚   โ””โ”€โ”€ test.csv
โ”‚   โ””โ”€โ”€ processed/
โ”‚
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ preprocessing.py
โ”‚   โ”œโ”€โ”€ embeddings/
โ”‚   โ”‚   โ”œโ”€โ”€ traditional.py
โ”‚   โ”‚   โ”œโ”€โ”€ transformer.py
โ”‚   โ”‚   โ””โ”€โ”€ openai.py
โ”‚   โ”œโ”€โ”€ models.py
โ”‚   โ””โ”€โ”€ evaluation.py
โ”‚
โ””โ”€โ”€ notebooks/
    โ”œโ”€โ”€ 1_data_exploration.ipynb
    โ”œโ”€โ”€ 2_embeddings.ipynb
    โ”œโ”€โ”€ 3_model_training.ipynb
    โ””โ”€โ”€ 4_evaluation.ipynb

Steps and Tasks

1. Data Preprocessing

First, letโ€™s implement a robust text preprocessing pipeline:

import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import re

class TextPreprocessor:
    def __init__(self):
        nltk.download('punkt')
        nltk.download('stopwords')
        nltk.download('wordnet')
        self.lemmatizer = WordNetLemmatizer()
        self.stop_words = set(stopwords.words('english'))
        
    def clean_text(self, text):
        """Basic text cleaning"""
        # Convert to lowercase
        text = text.lower()
        
        # Remove special characters and digits
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        
        # Tokenization
        tokens = word_tokenize(text)
        
        # Remove stopwords and lemmatize
        tokens = [self.lemmatizer.lemmatize(token) 
                 for token in tokens 
                 if token not in self.stop_words]
        
        return ' '.join(tokens)
Click to view advanced preprocessing
class AdvancedTextPreprocessor:
    def __init__(self):
        self.basic_preprocessor = TextPreprocessor()
        self.nlp = spacy.load('en_core_web_sm')
        
    def extract_entities(self, text):
        """Extract named entities"""
        doc = self.nlp(text)
        entities = [(ent.text, ent.label_) for ent in doc.ents]
        return entities
    
    def get_pos_tags(self, text):
        """Get POS tags"""
        doc = self.nlp(text)
        return [(token.text, token.pos_) for token in doc]
    
    def clean_and_augment(self, text):
        """Clean text and add linguistic features"""
        # Basic cleaning
        clean_text = self.basic_preprocessor.clean_text(text)
        
        # Get entities and POS tags
        entities = self.extract_entities(text)
        pos_tags = self.get_pos_tags(text)
        
        return {
            'clean_text': clean_text,
            'entities': entities,
            'pos_tags': pos_tags
        }

2. Text Embeddings

Implement different embedding approaches:

from transformers import AutoTokenizer, AutoModel
import torch

class TextEmbedder:
    def __init__(self, model_name='bert-base-uncased'):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        
    def get_bert_embeddings(self, texts):
        """Get BERT embeddings for texts"""
        # Tokenize
        encoded = self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors='pt'
        )
        
        # Get embeddings
        with torch.no_grad():
            outputs = self.model(**encoded)
            embeddings = outputs.last_hidden_state[:, 0, :]
            
        return embeddings.numpy()
Click to view additional embedding methods
class MultiEmbedder:
    def __init__(self):
        self.bert_embedder = TextEmbedder()
        
    def get_word2vec_embeddings(self, texts, vector_size=100):
        """Get Word2Vec embeddings"""
        # Train Word2Vec model
        tokenized_texts = [text.split() for text in texts]
        w2v_model = Word2Vec(
            sentences=tokenized_texts,
            vector_size=vector_size,
            window=5,
            min_count=1
        )
        
        # Get sentence embeddings by averaging word vectors
        embeddings = []
        for text in tokenized_texts:
            vectors = [w2v_model.wv[word] for word in text if word in w2v_model.wv]
            if vectors:
                embeddings.append(np.mean(vectors, axis=0))
            else:
                embeddings.append(np.zeros(vector_size))
                
        return np.array(embeddings)
    
    def get_openai_embeddings(self, texts):
        """Get OpenAI embeddings"""
        import openai
        
        embeddings = []
        for text in texts:
            response = openai.Embedding.create(
                input=text,
                model="text-embedding-ada-002"
            )
            embeddings.append(response['data'][0]['embedding'])
            
        return np.array(embeddings)

3. Model Implementation

Create text classification models:

from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

class TextClassifier:
    def __init__(self, model_type='rf'):
        if model_type == 'rf':
            self.model = RandomForestClassifier(
                n_estimators=100,
                max_depth=None,
                min_samples_split=2,
                random_state=42
            )
        else:
            self.model = LogisticRegression(
                max_iter=1000,
                random_state=42
            )
    
    def train(self, X_train, y_train):
        """Train the classifier"""
        self.model.fit(X_train, y_train)
        
    def predict(self, X):
        """Make predictions"""
        return self.model.predict(X)
Click to view deep learning model implementation
class DeepTextClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim // 2, num_classes)
        )
        
    def forward(self, x):
        return self.layers(x)
    
class TextClassificationTrainer:
    def __init__(self, model, device='cuda'):
        self.model = model
        self.device = device
        self.model.to(device)
        
    def train(self, train_loader, val_loader, epochs=10):
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.model.parameters())
        
        for epoch in range(epochs):
            self.model.train()
            train_loss = 0
            for batch in train_loader:
                optimizer.zero_grad()
                inputs, labels = batch
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)
                
                outputs = self.model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
            
            # Validation
            val_loss, val_acc = self.evaluate(val_loader)
            print(f'Epoch {epoch+1}: Train Loss = {train_loss/len(train_loader):.4f}, '
                  f'Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}')

4. Evaluation

Implement comprehensive evaluation metrics:

from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt

class ModelEvaluator:
    def __init__(self):
        self.metrics = {}
        
    def evaluate(self, y_true, y_pred):
        """Calculate classification metrics"""
        self.metrics['accuracy'] = accuracy_score(y_true, y_pred)
        self.metrics['report'] = classification_report(y_true, y_pred)
        
        return self.metrics
    
    def plot_confusion_matrix(self, y_true, y_pred, classes):
        """Plot confusion matrix"""
        cm = confusion_matrix(y_true, y_pred)
        
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=classes, yticklabels=classes)
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()
Click to view advanced evaluation techniques
class AdvancedEvaluator:
    def __init__(self):
        self.basic_evaluator = ModelEvaluator()
        
    def cross_validate_model(self, model, X, y, cv=5):
        """Perform cross-validation"""
        cv_scores = cross_val_score(model, X, y, cv=cv)
        return {
            'mean_cv_score': cv_scores.mean(),
            'std_cv_score': cv_scores.std()
        }
    
    def learning_curve_analysis(self, model, X, y):
        """Plot learning curves"""
        train_sizes, train_scores, val_scores = learning_curve(
            model, X, y, cv=5, n_jobs=-1, 
            train_sizes=np.linspace(0.1, 1.0, 10))
        
        plt.figure(figsize=(10, 6))
        plt.plot(train_sizes, np.mean(train_scores, axis=1), label='Training score')
        plt.plot(train_sizes, np.mean(val_scores, axis=1), label='Cross-validation score')
        plt.title('Learning Curves')
        plt.xlabel('Training Examples')
        plt.ylabel('Score')
        plt.legend(loc='best')
        plt.grid(True)
        plt.show()

5. Model Deployment

Set up a simple API for text classification:

from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn

class TextInput(BaseModel):
    text: str

app = FastAPI()

@app.post("/classify/")
async def classify_text(input_data: TextInput):
    # Preprocess
    preprocessor = TextPreprocessor()
    clean_text = preprocessor.clean_text(input_data.text)
    
    # Get embeddings
    embedder = TextEmbedder()
    embedding = embedder.get_bert_embeddings([clean_text])
    
    # Make prediction
    prediction = model.predict(embedding)[0]
    
    return {"prediction": prediction}
Click to view advanced deployment setup
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import mlflow
import redis

class ProductionClassifier:
    def __init__(self):
        # Load models and preprocessing tools
        self.preprocessor = AdvancedTextPreprocessor()
        self.embedder = MultiEmbedder()
        
        # Initialize Redis for caching
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        
        # Load models from MLflow
        self.models = self.load_models()
        
    def load_models(self):
        """Load models from MLflow"""
        models = {}
        for model_name in ['bert', 'traditional']:
            model_path = f"models:/{model_name}/Production"
            models[model_name] = mlflow.pyfunc.load_model(model_path)
        return models
    
    async def get_cached_prediction(self, text_hash):
        """Check cache for previous predictions"""
        return self.redis_client.get(text_hash)
    
    async def cache_prediction(self, text_hash, prediction, expire_time=3600):
        """Cache prediction results"""
        self.redis_client.setex(text_hash, expire_time, prediction)

class ModelServer:
    def __init__(self):
        self.app = FastAPI()
        self.classifier = ProductionClassifier()
        
        # Add CORS middleware
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
        
        # Add routes
        self.setup_routes()
        
    def setup_routes(self):
        @self.app.post("/classify/")
        async def classify_text(input_data: TextInput):
            try:
                # Check cache
                text_hash = hash(input_data.text)
                cached_result = await self.classifier.get_cached_prediction(text_hash)
                
                if cached_result:
                    return {"prediction": cached_result, "source": "cache"}
                
                # Process new prediction
                result = await self.process_text(input_data.text)
                
                # Cache result
                await self.classifier.cache_prediction(text_hash, result)
                
                return {"prediction": result, "source": "model"}
                
            except Exception as e:
                raise HTTPException(status_code=500, detail=str(e))
        
        @self.app.get("/health/")
        async def health_check():
            return {"status": "healthy"}
        
    async def process_text(self, text):
        # Implement processing pipeline
        clean_text = self.classifier.preprocessor.clean_text(text)
        embedding = self.classifier.embedder.get_bert_embeddings([clean_text])
        prediction = self.classifier.models['bert'].predict(embedding)[0]
        return prediction

# Run server
if __name__ == "__main__":
    server = ModelServer()
    uvicorn.run(server.app, host="0.0.0.0", port=8000)

6. Performance Monitoring and Maintenance

Set up monitoring for the deployed model:

from datetime import datetime
import logging

class ModelMonitor:
    def __init__(self):
        self.setup_logging()
        
    def setup_logging(self):
        logging.basicConfig(
            filename=f'logs/model_monitoring_{datetime.now():%Y%m%d}.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        
    def log_prediction(self, text, prediction, confidence):
        """Log each prediction with metadata"""
        logging.info(
            f"Prediction made:\n"
            f"Text length: {len(text)}\n"
            f"Prediction: {prediction}\n"
            f"Confidence: {confidence:.4f}"
        )
Click to view advanced monitoring setup
import pandas as pd
from sklearn.metrics import accuracy_score
import plotly.graph_objects as go

class AdvancedModelMonitor:
    def __init__(self):
        self.basic_monitor = ModelMonitor()
        self.predictions_log = []
        self.performance_metrics = {}
        
    def log_prediction_with_features(self, text, prediction, actual=None, metadata=None):
        """Log detailed prediction information"""
        log_entry = {
            'timestamp': datetime.now(),
            'text_length': len(text),
            'prediction': prediction,
            'actual': actual,
            'metadata': metadata or {}
        }
        self.predictions_log.append(log_entry)
        
    def calculate_performance_metrics(self, window_size=1000):
        """Calculate rolling performance metrics"""
        if len(self.predictions_log) < window_size:
            return
            
        recent_predictions = pd.DataFrame(self.predictions_log[-window_size:])
        
        # Calculate accuracy if actual labels are available
        if 'actual' in recent_predictions.columns:
            accuracy = accuracy_score(
                recent_predictions['actual'],
                recent_predictions['prediction']
            )
            self.performance_metrics['accuracy'] = accuracy
        
        # Calculate prediction distribution
        pred_dist = recent_predictions['prediction'].value_counts(normalize=True)
        self.performance_metrics['prediction_distribution'] = pred_dist.to_dict()
        
    def plot_performance_trends(self):
        """Plot performance trends over time"""
        df = pd.DataFrame(self.predictions_log)
        
        # Create rolling accuracy plot
        fig = go.Figure()
        
        if 'actual' in df.columns:
            rolling_accuracy = df.rolling(window=100).apply(
                lambda x: accuracy_score(x['actual'], x['prediction'])
            )
            
            fig.add_trace(go.Scatter(
                x=df['timestamp'],
                y=rolling_accuracy,
                mode='lines',
                name='Rolling Accuracy'
            ))
            
        # Add prediction distribution
        pred_dist_over_time = df.groupby([
            pd.Grouper(key='timestamp', freq='D'),
            'prediction'
        ]).size().unstack(fill_value=0)
        
        for col in pred_dist_over_time.columns:
            fig.add_trace(go.Scatter(
                x=pred_dist_over_time.index,
                y=pred_dist_over_time[col],
                mode='lines',
                name=f'Class {col} predictions'
            ))
            
        fig.update_layout(
            title='Model Performance Trends',
            xaxis_title='Time',
            yaxis_title='Metric Value',
            hovermode='x unified'
        )
        
        fig.show()
        
    def generate_performance_report(self):
        """Generate comprehensive performance report"""
        report = {
            'timestamp': datetime.now(),
            'total_predictions': len(self.predictions_log),
            'recent_metrics': self.performance_metrics,
            'prediction_volume': len(self.predictions_log[-1000:]),
            'average_text_length': np.mean([
                log['text_length'] for log in self.predictions_log[-1000:]
            ])
        }
        
        return report

7. Best Practices and Optimization

  1. Model Optimization:

    • Use model quantization for faster inference
    • Implement batch prediction for better throughput
    • Cache frequent predictions
    • Consider using TensorRT for GPU acceleration
  2. Error Handling:

    • Implement robust error handling for malformed inputs
    • Add retry logic for failed predictions
    • Monitor and log edge cases
    • Set up alerts for critical failures
  3. Performance Tips:

    • Use appropriate batch sizes for embedding generation
    • Implement proper text cleaning pipeline
    • Consider using GPU acceleration for large-scale deployment
    • Optimize model loading and caching strategies
  4. Monitoring Best Practices:

    • Set up comprehensive logging
    • Monitor system resources (CPU, memory, GPU)
    • Track prediction latency
    • Implement data drift detection
    • Set up automated alerts for performance degradation
  5. Scaling Considerations:

    • Use load balancing for multiple model instances
    • Implement horizontal scaling for high traffic
    • Consider using Kubernetes for container orchestration
    • Set up auto-scaling based on traffic patterns

This deployment and monitoring setup provides:

  1. Production-ready API implementation
  2. Comprehensive monitoring system
  3. Performance optimization guidelines
  4. Scaling strategies
  5. Best practices for maintaining model performance