Section 1: File Handling

Data doesn’t magically appear in your Python code - it lives in files. CSV files from your CRM, Excel exports from accounting, JSON data from APIs. File handling is your bridge between the messy real world and clean Python analysis. How do you connect your code to real data sources? Learn these patterns, and you can work with any data source. Miss this, and you’re stuck with manually copying and pasting data forever.

Introduction

File handling is important for data science because most real-world data comes from files. You need to read data from CSV files, text files, and other formats, then write results back to files for further analysis or reporting.

Reading Text Files

Python provides simple ways to read text files for data processing.

Basic File Reading

# Read entire file at once
with open('data.txt', 'r') as file:
    content = file.read()
    print(content)

# Read file line by line
with open('data.txt', 'r') as file:
    for line in file:
        print(line.strip())  # strip() removes newline characters

# Read all lines into a list
with open('data.txt', 'r') as file:
    lines = file.readlines()
    print(f"File has {len(lines)} lines")

File Reading with Error Handling

def read_file_safely(filename):
    """Read file with proper error handling"""
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            content = file.read()
            return content
    except FileNotFoundError:
        print(f"Error: File '{filename}' not found")
        return None
    except PermissionError:
        print(f"Error: Permission denied for '{filename}'")
        return None
    except UnicodeDecodeError:
        print(f"Error: Cannot decode file '{filename}' as UTF-8")
        return None

# Test the function
content = read_file_safely('sales_data.txt')
if content:
    print(f"File content length: {len(content)} characters")

Writing Text Files

Writing files lets you save analysis results and create reports.

Basic File Writing

# Write text to file
data = "Sales Report\nTotal: $50,000\nAverage: $5,000"

with open('report.txt', 'w') as file:
    file.write(data)

# Append to existing file
with open('report.txt', 'a') as file:
    file.write("\nGenerated on: 2024-01-15")

# Write multiple lines
lines = ["Line 1", "Line 2", "Line 3"]
with open('output.txt', 'w') as file:
    for line in lines:
        file.write(line + '\n')

Writing Data Analysis Results

def save_analysis_report(results, filename):
    """Save analysis results to file"""
    try:
        with open(filename, 'w', encoding='utf-8') as file:
            file.write("Data Analysis Report\n")
            file.write("=" * 30 + "\n\n")
            
            for key, value in results.items():
                file.write(f"{key}: {value}\n")
        
        print(f"Report saved to {filename}")
        return True
    except Exception as e:
        print(f"Error saving report: {e}")
        return False

# Example usage
analysis_results = {
    'Total Sales': 150000,
    'Average Sale': 1500,
    'Number of Transactions': 100,
    'Best Day': 'Monday'
}

save_analysis_report(analysis_results, 'sales_analysis.txt')

CSV File Processing

CSV files are common in data science. Python provides excellent tools for reading and writing CSV data.

Reading CSV Files

import csv

# Read CSV file
def read_csv_file(filename):
    """Read CSV file and return data as list of dictionaries"""
    data = []
    try:
        with open(filename, 'r', newline='', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                data.append(row)
        return data
    except FileNotFoundError:
        print(f"File {filename} not found")
        return []
    except Exception as e:
        print(f"Error reading CSV: {e}")
        return []

# Example usage
sales_data = read_csv_file('sales.csv')
print(f"Loaded {len(sales_data)} records")

# Display first few records
for i, record in enumerate(sales_data[:3]):
    print(f"Record {i+1}: {record}")

Writing CSV Files

def write_csv_file(data, filename):
    """Write data to CSV file"""
    if not data:
        print("No data to write")
        return False
    
    try:
        with open(filename, 'w', newline='', encoding='utf-8') as file:
            # Get fieldnames from first record
            fieldnames = data[0].keys()
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            
            # Write header
            writer.writeheader()
            
            # Write data
            writer.writerows(data)
        
        print(f"Data saved to {filename}")
        return True
    except Exception as e:
        print(f"Error writing CSV: {e}")
        return False

# Example usage
customer_data = [
    {'name': 'Alice', 'email': 'alice@email.com', 'purchases': 5},
    {'name': 'Bob', 'email': 'bob@email.com', 'purchases': 3},
    {'name': 'Carol', 'email': 'carol@email.com', 'purchases': 8}
]

write_csv_file(customer_data, 'customers.csv')

Processing CSV Data

def analyze_sales_csv(filename):
    """Analyze sales data from CSV file"""
    data = read_csv_file(filename)
    if not data:
        return None
    
    # Convert string values to numbers
    for record in data:
        try:
            record['amount'] = float(record['amount'])
        except (ValueError, KeyError):
            record['amount'] = 0
    
    # Calculate statistics
    amounts = [record['amount'] for record in data]
    total_sales = sum(amounts)
    average_sale = total_sales / len(amounts)
    max_sale = max(amounts) if amounts else 0
    min_sale = min(amounts) if amounts else 0
    
    return {
        'total_records': len(data),
        'total_sales': total_sales,
        'average_sale': average_sale,
        'max_sale': max_sale,
        'min_sale': min_sale
    }

# Example usage
results = analyze_sales_csv('sales.csv')
if results:
    print("Sales Analysis Results:")
    for key, value in results.items():
        print(f"{key}: {value}")

Advanced File Operations

Working with Different File Formats

import json

def save_json_data(data, filename):
    """Save data as JSON file"""
    try:
        with open(filename, 'w', encoding='utf-8') as file:
            json.dump(data, file, indent=2, ensure_ascii=False)
        print(f"JSON data saved to {filename}")
        return True
    except Exception as e:
        print(f"Error saving JSON: {e}")
        return False

def load_json_data(filename):
    """Load data from JSON file"""
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            data = json.load(file)
        return data
    except FileNotFoundError:
        print(f"File {filename} not found")
        return None
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON in {filename}")
        return None
    except Exception as e:
        print(f"Error loading JSON: {e}")
        return None

# Example usage
analysis_results = {
    'summary': {
        'total_sales': 150000,
        'average_sale': 1500,
        'top_customer': 'Alice Johnson'
    },
    'details': [
        {'customer': 'Alice Johnson', 'sales': 50000},
        {'customer': 'Bob Smith', 'sales': 30000},
        {'customer': 'Carol Davis', 'sales': 70000}
    ]
}

save_json_data(analysis_results, 'analysis_results.json')
loaded_data = load_json_data('analysis_results.json')

File Path Operations

import os
from pathlib import Path

def organize_data_files(directory):
    """Organize data files by type"""
    if not os.path.exists(directory):
        print(f"Directory {directory} does not exist")
        return
    
    # Create subdirectories
    csv_dir = os.path.join(directory, 'csv_files')
    txt_dir = os.path.join(directory, 'text_files')
    json_dir = os.path.join(directory, 'json_files')
    
    for subdir in [csv_dir, txt_dir, json_dir]:
        os.makedirs(subdir, exist_ok=True)
    
    # Move files to appropriate directories
    for filename in os.listdir(directory):
        if filename.endswith('.csv'):
            os.rename(
                os.path.join(directory, filename),
                os.path.join(csv_dir, filename)
            )
        elif filename.endswith('.txt'):
            os.rename(
                os.path.join(directory, filename),
                os.path.join(txt_dir, filename)
            )
        elif filename.endswith('.json'):
            os.rename(
                os.path.join(directory, filename),
                os.path.join(json_dir, filename)
            )

# Example usage
organize_data_files('data_files')

Practice Exercise

Create a comprehensive file processing system for data analysis:

import csv
import json
import os
from datetime import datetime

class DataFileProcessor:
    """Comprehensive file processing system for data analysis"""
    
    def __init__(self, data_directory):
        self.data_directory = data_directory
        self.processed_files = []
        self.errors = []
    
    def process_sales_data(self, input_file, output_file):
        """Process sales data from CSV and generate analysis report"""
        try:
            # Read CSV data
            sales_data = self.read_csv_file(input_file)
            if not sales_data:
                return False
            
            # Process data
            analysis = self.analyze_sales_data(sales_data)
            
            # Generate report
            report = self.generate_sales_report(analysis)
            
            # Save results
            self.save_text_file(report, output_file)
            
            # Save processed data as JSON
            json_file = output_file.replace('.txt', '.json')
            self.save_json_file(analysis, json_file)
            
            self.processed_files.append(input_file)
            return True
            
        except Exception as e:
            error_msg = f"Error processing {input_file}: {e}"
            self.errors.append(error_msg)
            print(error_msg)
            return False
    
    def read_csv_file(self, filename):
        """Read CSV file with error handling"""
        filepath = os.path.join(self.data_directory, filename)
        data = []
        
        try:
            with open(filepath, 'r', newline='', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                for row in reader:
                    # Convert numeric fields
                    if 'amount' in row:
                        try:
                            row['amount'] = float(row['amount'])
                        except ValueError:
                            row['amount'] = 0
                    data.append(row)
            return data
        except FileNotFoundError:
            print(f"File {filepath} not found")
            return []
        except Exception as e:
            print(f"Error reading {filename}: {e}")
            return []
    
    def analyze_sales_data(self, data):
        """Analyze sales data and return comprehensive results"""
        if not data:
            return {}
        
        # Basic statistics
        amounts = [record.get('amount', 0) for record in data]
        total_sales = sum(amounts)
        average_sale = total_sales / len(amounts) if amounts else 0
        
        # Customer analysis
        customer_totals = {}
        for record in data:
            customer = record.get('customer', 'Unknown')
            amount = record.get('amount', 0)
            customer_totals[customer] = customer_totals.get(customer, 0) + amount
        
        # Find top customer
        top_customer = max(customer_totals.items(), key=lambda x: x[1]) if customer_totals else ('None', 0)
        
        # Monthly analysis (if date field exists)
        monthly_sales = {}
        for record in data:
            if 'date' in record:
                try:
                    # Extract month from date (assuming YYYY-MM-DD format)
                    month = record['date'][:7]  # YYYY-MM
                    monthly_sales[month] = monthly_sales.get(month, 0) + record.get('amount', 0)
                except:
                    pass
        
        return {
            'summary': {
                'total_records': len(data),
                'total_sales': total_sales,
                'average_sale': average_sale,
                'max_sale': max(amounts) if amounts else 0,
                'min_sale': min(amounts) if amounts else 0
            },
            'top_customer': {
                'name': top_customer[0],
                'total_sales': top_customer[1]
            },
            'customer_analysis': customer_totals,
            'monthly_analysis': monthly_sales,
            'analysis_date': datetime.now().isoformat()
        }
    
    def generate_sales_report(self, analysis):
        """Generate human-readable sales report"""
        report = f"""
SALES ANALYSIS REPORT
{'=' * 50}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

SUMMARY:
- Total Records: {analysis['summary']['total_records']}
- Total Sales: ${analysis['summary']['total_sales']:,.2f}
- Average Sale: ${analysis['summary']['average_sale']:,.2f}
- Highest Sale: ${analysis['summary']['max_sale']:,.2f}
- Lowest Sale: ${analysis['summary']['min_sale']:,.2f}

TOP CUSTOMER:
- Name: {analysis['top_customer']['name']}
- Total Sales: ${analysis['top_customer']['total_sales']:,.2f}

CUSTOMER BREAKDOWN:
"""
        
        for customer, total in analysis['customer_analysis'].items():
            report += f"- {customer}: ${total:,.2f}\n"
        
        if analysis['monthly_analysis']:
            report += "\nMONTHLY BREAKDOWN:\n"
            for month, total in analysis['monthly_analysis'].items():
                report += f"- {month}: ${total:,.2f}\n"
        
        return report
    
    def save_text_file(self, content, filename):
        """Save text content to file"""
        filepath = os.path.join(self.data_directory, filename)
        try:
            with open(filepath, 'w', encoding='utf-8') as file:
                file.write(content)
            print(f"Text file saved: {filename}")
        except Exception as e:
            print(f"Error saving text file {filename}: {e}")
    
    def save_json_file(self, data, filename):
        """Save data as JSON file"""
        filepath = os.path.join(self.data_directory, filename)
        try:
            with open(filepath, 'w', encoding='utf-8') as file:
                json.dump(data, file, indent=2, ensure_ascii=False)
            print(f"JSON file saved: {filename}")
        except Exception as e:
            print(f"Error saving JSON file {filename}: {e}")
    
    def get_processing_summary(self):
        """Get summary of processing results"""
        return {
            'processed_files': len(self.processed_files),
            'errors': len(self.errors),
            'success_rate': len(self.processed_files) / (len(self.processed_files) + len(self.errors)) * 100 if (len(self.processed_files) + len(self.errors)) > 0 else 0
        }

# Example usage
processor = DataFileProcessor('data')

# Process sales data
success = processor.process_sales_data('sales.csv', 'sales_report.txt')

if success:
    summary = processor.get_processing_summary()
    print(f"Processing complete!")
    print(f"Files processed: {summary['processed_files']}")
    print(f"Errors: {summary['errors']}")
    print(f"Success rate: {summary['success_rate']:.1f}%")
else:
    print("Processing failed. Check errors above.")

Assets

Resources

  • Python file handling: https://docs.python.org/3/tutorial/inputoutput.html#reading-and-writing-files
  • CSV module documentation: https://docs.python.org/3/library/csv.html
  • JSON module documentation: https://docs.python.org/3/library/json.html
  • Pathlib tutorial: https://realpython.com/python-pathlib/

Summary

File handling is important for data science workflows. Key concepts include reading and writing text files, processing CSV data, handling different file formats, and managing file operations safely. These skills enable you to work with real-world data sources and save analysis results.


© 2025 Prof. Tim Frenzel. All rights reserved. | Version 1.0.5