-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoutbound_example.py
More file actions
140 lines (119 loc) · 5.2 KB
/
outbound_example.py
File metadata and controls
140 lines (119 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
"""
Example usage of the LinkedIn outbound automation tools.
This script demonstrates how to use the outbound tools to:
1. Process contacts from a CSV file
2. Scrape LinkedIn profiles
3. Generate personalized cold emails and SMS
4. Send messages via CCAI
"""
import asyncio
import json
from linkedin_mcp_server.tools.outbound import (
process_contacts_csv,
batch_generate_outbound,
send_outbound_messages,
run_full_outbound_campaign
)
async def example_step_by_step():
"""Example of running the outbound process step by step."""
print("=== Step-by-Step Outbound Campaign ===\n")
# Step 1: Process CSV and scrape LinkedIn profiles
print("Step 1: Processing contacts from CSV...")
contacts_result = await process_contacts_csv(
csv_file_path="sample_contacts.csv",
linkedin_column="linkedin_url",
name_column="name",
email_column="email",
phone_column="phone",
company_column="company",
limit=2 # Process only first 2 contacts for demo
)
if not contacts_result.get("success"):
print(f"Error processing contacts: {contacts_result}")
return
print(f"✓ Processed {contacts_result['processed_count']} contacts")
print(f"✓ Found {len(contacts_result['contacts'])} valid profiles")
if contacts_result.get("errors"):
print(f"⚠ Errors: {contacts_result['errors']}")
print()
# Step 2: Generate outbound messages
print("Step 2: Generating personalized messages...")
messages_result = await batch_generate_outbound(
contacts_data=contacts_result["contacts"],
sender_name="Andreas",
sender_company="YourCompany",
email_template="professional",
sms_template="brief",
custom_email_message="I'd love to discuss potential collaboration opportunities."
)
if not messages_result.get("success"):
print(f"Error generating messages: {messages_result}")
return
print(f"✓ Generated messages for {messages_result['generated_count']} contacts")
# Show sample generated content
if messages_result["messages"]:
sample = messages_result["messages"][0]
print(f"\nSample Email for {sample['recipient_name']}:")
print(f"Subject: {sample['email']['subject']}")
print(f"Body: {sample['email']['body'][:100]}...")
print(f"\nSample SMS: {sample['sms']['message'][:100]}...")
print()
# Step 3: Send via CCAI (commented out - requires real API key)
print("Step 3: Sending messages via CCAI...")
print("⚠ Skipping actual send - requires valid CCAI API key")
print("To send messages, uncomment the code below and provide your CCAI API key")
# Uncomment and configure with your CCAI credentials:
# send_result = await send_outbound_messages(
# messages_data=messages_result["messages"],
# ccai_api_key="your-ccai-api-key",
# from_email="your-email@company.com",
# from_name="Your Name",
# from_phone="+1-555-0100",
# send_emails=True,
# send_sms=True
# )
# print(f"✓ Sent {send_result.get('emails_sent', 0)} emails")
# print(f"✓ Sent {send_result.get('sms_sent', 0)} SMS messages")
async def example_full_campaign():
"""Example of running a complete campaign in one call."""
print("\n=== Full Campaign (One Command) ===\n")
# Run complete campaign (commented out - requires real API key)
print("Running full outbound campaign...")
print("⚠ Skipping actual campaign - requires valid CCAI API key")
# Uncomment and configure with your CCAI credentials:
# campaign_result = await run_full_outbound_campaign(
# csv_file_path="sample_contacts.csv",
# sender_name="Andreas",
# sender_company="YourCompany",
# ccai_api_key="your-ccai-api-key",
# email_template="professional",
# sms_template="brief",
# custom_email_message="I'd love to discuss potential partnerships.",
# from_email="your-email@company.com",
# from_name="Your Name",
# from_phone="+1-555-0100",
# limit=5
# )
#
# if campaign_result.get("success"):
# summary = campaign_result["campaign_summary"]
# print(f"✓ Campaign completed successfully!")
# print(f"✓ Contacts processed: {summary['contacts_processed']}")
# print(f"✓ Messages generated: {summary['messages_generated']}")
# print(f"✓ Emails sent: {summary['emails_sent']}")
# print(f"✓ SMS sent: {summary['sms_sent']}")
# else:
# print(f"✗ Campaign failed: {campaign_result}")
if __name__ == "__main__":
print("LinkedIn Outbound Automation Example")
print("====================================")
# Run examples
asyncio.run(example_step_by_step())
asyncio.run(example_full_campaign())
print("\n=== Configuration Notes ===")
print("1. Update sample_contacts.csv with your actual contacts")
print("2. Get your CCAI API key and configure the send functions")
print("3. Set up your LinkedIn cookie in the MCP server")
print("4. Customize email/SMS templates as needed")
print("5. Test with a small batch first (use limit parameter)")