Data Loader
Introduction​
The maap-data-loader
project is designed to streamline data ingestion and processing for the MongoDB Atlas Application Partner (MAAP) program. This tool provides a robust mechanism to load, transform, and manage data efficiently, ensuring seamless integration with MongoDB Atlas. It supports both structured and unstructured data processing, with built-in support for document processing, embeddings generation, and integration with various AI services.
Features​
- Document Processing: Automated processing of various document formats
- Embedding Generation: Built-in support for generating embeddings using AI services
- MongoDB Integration: Direct integration with MongoDB Atlas for efficient data storage
- Configurable Pipeline: Flexible pipeline configuration for different data processing needs
- Logging System: Comprehensive logging system for tracking operations
- Enterprise Support: Dedicated enterprise features for large-scale deployments
Project Structure​
The project is organized as follows:
maap-data-loader/
├── app.py # Main application entry point
├── config.py # Global configuration settings
├── Dockerfile # Container definition for deployment
├── requirements.txt # Python dependencies
├── enterprise/ # Enterprise-specific implementations
│ ├── mongodb_ingest.py # MongoDB ingestion logic
│ ├── pipeline_executor.py # Data pipeline execution
│ └── util/ # Enterprise utilities
│ ├── base_configs.py # Base configuration classes
│ ├── builder.py # Pipeline builder
│ └── configs/ # Configuration components
│ ├── downloader.py # Data download configurations
│ ├── indexer.py # Indexing configurations
│ └── source.py # Data source configurations
├── local/ # Local development components
│ ├── database/ # Database interactions
│ ├── models/ # Data models and schemas
│ ├── services/ # Core services
│ │ ├── bedrock_service.py # AWS Bedrock integration
│ │ ├── document_service.py # Document processing
│ │ └── embedding_service.py # Embedding generation
│ └── utils/ # Utility functions
Prerequisites​
- Python 3.8 or higher
- MongoDB Atlas account
- Docker (for containerized deployment)
- AWS account (for Bedrock service integration)
- Sufficient storage for document processing
Setup Instructions​
-
Clone the Repository
git clone https://github.com/mongodb-partners/maap-data-loader.git
cd maap-data-loader -
Quick Setup Using Make
The project includes a Makefile for common operations. To get started quickly:# View all available commands
make help
# Set up virtual environment and install dependencies
make setup
# Install additional development dependencies
make install-devFor other operations:
make test # Run tests
make lint # Run linting and formatting
make run # Run the application
make clean # Clean up build artifacts
make docker-build # Build Docker image
make docker-run # Run Docker container
make logs # View application logs
make backup # Backup processed data -
Manual Setup
If not using Makefile, perform the following steps:-
Set Up Python Environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt -
Configure Environment Variables
Create a.env
file in the root directory with the following variables:MONGODB_URI=your_mongodb_connection_string
AWS_ACCESS_KEY_ID=your_aws_access_key
AWS_SECRET_ACCESS_KEY=your_aws_secret_key
AWS_REGION=your_aws_region -
Configure Application Settings
Updateconfig.py
with your specific settings:- Database configurations
- Processing pipeline settings
- Document processing parameters
- Embedding service configurations
-
Run the Application
python app.py
-
Usage Examples​
MAAP Data Loader - cURL Examples
These examples demonstrate how to interact with the MAAP Data Loader API using cURL commands.
Health Check​
Check if the API service is running:
curl -X GET http://localhost:8000/health
Expected response:
{"status": "healthy"}
Upload Files (PDF, DOCX, etc.)​
Upload files for processing and embedding generation:
curl -X POST http://localhost:8000/local/upload \
-H "Content-Type: multipart/form-data" \
-F "files=@/path/to/your/document.pdf" \
-F 'json_input_params={
"user_id": "user123",
"mongodb_config": {
"uri": "mongodb+srv://username:password@cluster.mongodb.net",
"database": "maap_db",
"collection": "documents",
"index_name": "vector_index",
"text_field": "text",
"embedding_field": "embedding"
}
}'
Process Web Pages​
Process web content without file upload:
curl -X POST http://localhost:8000/local/upload \
-H "Content-Type: multipart/form-data" \
-F 'json_input_params={
"user_id": "user123",
"mongodb_config": {
"uri": "mongodb+srv://username:password@cluster.mongodb.net",
"database": "maap_db",
"collection": "documents",
"index_name": "vector_index",
"text_field": "text",
"embedding_field": "embedding"
},
"web_pages": ["https://www.mongodb.com/docs", "https://www.mongodb.com/atlas"]
}'
Handling Responses​
Successful response example:
{
"success": true,
"message": "Successfully processed 3 documents",
"details": {
"processed_files": ["document1.pdf", "document2.docx", "document3.txt"],
"documents_count": 3
}
}
Error response example:
{
"success": false,
"error": "Invalid MongoDB URI",
"traceback": null
}
Enterprise API Examples​
Register a New Source​
Local File System​
Register a local file system as a data source:
curl --location --request GET 'localhost:8182/register/source' \
--header 'Content-Type: application/json' \
--data-raw '{
"sync_interval_seconds": 360,
"source": {
"source_type": "local",
"params": {
"remote_url": "<source-url-folder-path>",
"chunking_strategy": "by_title",
"chunk_max_characters": "1500",
"chunk_overlap": "100"
}
},
"destination": {
"mongodb_uri": "<your-mongodb-connection-string>",
"database": "<your-db-name>",
"collection": "<your-collection-name>",
"index_name": "default",
"embedding_path": "embeddings",
"embedding_dimensions": 1536,
"id_fields": ["field1", "field2"],
"create_md5": true,
"batch_size": 100
}
}'
AWS S3 Bucket​
Register an AWS S3 bucket as a data source:
curl --location --request GET 'localhost:8182/register/source' \
--header 'Content-Type: application/json' \
--data-raw '{
"sync_interval_seconds": 360,
"source": {
"source_type": "s3",
"credentials": {
"aws_access_key_id": "<your-aws-access-key-id>",
"aws_secret_access_key": "<your-aws-secret-key>",
"aws_session_token": "<your-aws-session-token>"
},
"params": {
"remote_url": "<source-url-folder-path>",
"chunking_strategy": "by_title",
"chunk_max_characters": "1500",
"chunk_overlap": "100"
}
},
"destination": {
"mongodb_uri": "<your-mongodb-connection-string>",
"database": "<your-db-name>",
"collection": "<your-collection-name>",
"index_name": "default",
"embedding_path": "embeddings",
"embedding_dimensions": 1536,
"id_fields": ["field1", "field2"],
"create_md5": true,
"batch_size": 100
}
}'
Google Drive​
Register a Google Drive folder as a data source:
curl --location --request GET 'localhost:8182/register/source' \
--header 'Content-Type: application/json' \
--data-raw '{
"sync_interval_seconds": 360,
"source": {
"source_type": "google-drive",
"credentials": {
"gcp_service_account_key_string": "<gcp_service_account_key_string>",
"google_drive_folder_id": "<google_drive_folder_id>"
},
"params": {
"remote_url": "<source-url-folder-path>",
"chunking_strategy": "by_title",
"chunk_max_characters": "1500",
"chunk_overlap": "100"
}
},
"destination": {
"mongodb_uri": "<your-mongodb-connection-string>",
"database": "<your-db-name>",
"collection": "<your-collection-name>",
"index_name": "default",
"embedding_path": "embeddings",
"embedding_dimensions": 1536,
"id_fields": ["field1", "field2"],
"create_md5": true,
"batch_size": 100
}
}'
Expected response for all source registrations:
{
"message": "Source registered successfully"
}
Monitoring and Logging​
- Logs are stored in
local/logs/MAAP-Loader.log
- Monitor MongoDB operations through Atlas dashboard
- Check processing status in application logs
Container Deployment​
Build and run using Docker:
docker build -t maap-data-loader .
docker run -d --env-file .env maap-data-loader
Architecture​
+------------------------+
| Data Sources |
| (Files, APIs, Streams) |
+------------------------+
↓
+------------------------+
| Document Processor |
| (Parse, Clean, Format) |
+------------------------+
↓
+------------------------+
| Embedding Generator |
| (AI Service Integration)|
+------------------------+
↓
+------------------------+
| MongoDB Atlas |
| (Storage & Indexing) |
+------------------------+
Detailed Architecture​
Component Description​
Input Layer​
- Data Sources: Various input sources including files, APIs, and streams
- File Upload: Handles file ingestion and initial validation
Processing Layer​
- Enterprise Services
- Pipeline Executor: Orchestrates data processing workflows
- MongoDB Ingest: Handles data ingestion into MongoDB
- Configuration Components: Manages processing settings
- Local Services
- Document Service: Processes and transforms documents
- Bedrock Service: AWS Bedrock integration for AI capabilities
- Embedding Service: Generates embeddings for documents
Storage Layer​
- MongoDB: Primary data store
- Uploaded Files: Temporary storage for processed files
- Logs: Application logging and monitoring
Utility Layer​
- Error Utils: Error handling and reporting
- File Utils: File system operations
- Logger: Logging and monitoring utilities
Best Practices​
- Always use virtual environments
- Keep sensitive information in environment variables
- Regular backup of processed data
- Monitor system resources during large-scale processing
- Use appropriate indexes in MongoDB for better query performance
Troubleshooting​
Common issues and solutions:
- Connection Errors: Verify MongoDB URI and network connectivity
- Memory Issues: Check document size and processing batch size
- Processing Errors: Verify file formats and permissions
- AWS Integration: Confirm AWS credentials and permissions
Contributing​
- Fork the repository
- Create a feature branch
- Submit a pull request with detailed description
- Ensure tests pass and code meets style guidelines
License​
This project is licensed under the MIT License - see the LICENSE file for details.