MailIO is an Apache Beam connector that enables reading emails from mail servers using the IMAP protocol. It provides a bounded source for processing email messages in batch operations.
- Read emails from IMAP mail servers
- Support for multiple folder selection
- Advanced search capabilities with combinable filters
- Secure authentication with username/password
- Bounded source implementation for batch processing
Here's a simple example of reading all emails from a mail server:
Pipeline pipeline = Pipeline.create(options);
PCollection<MailMessage> messages = pipeline.apply(
MailIO.read(
MailIO.MailConfiguration.create(
"imap.example.com", // host
"username@example.com", // username
"password" // password
)
)
);
You can specify which folders to read from:
PCollection<MailMessage> messages = pipeline.apply(
MailIO.read(
MailIO.MailConfiguration.create(
"imap.example.com",
"username@example.com",
"password",
Arrays.asList("Archive", "Sent Messages", "Drafts")
)
)
);
MailIO supports advanced search capabilities. You can filter emails based on various criteria:
PCollection<MailMessage> messages = pipeline.apply(
MailIO.read(config)
.withSearchFilter(
MailSearchFilter.subject("hello")
.and(MailSearchFilter.receivedDate(
MailSearchFilter.Condition.LE,
new Date()))
.or(MailSearchFilter.from("beam"))
)
);
Here's a complete example that demonstrates reading emails, applying filters, and processing the messages:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Read emails with filters
PCollection<MailMessage> messages =
pipeline.apply(
MailIO.read(config)
.withSearchFilter(
MailSearchFilter.subject("important")
.and(
MailSearchFilter.receivedDate(
MailSearchFilter.Condition.LE, new Date()))));
// Process the messages
messages.apply(
ParDo.of(
new DoFn<MailMessage, Void>() {
@ProcessElement
public void processElement(@Element MailMessage message) {
// Process each email message
System.out.println("Subject: " + message.getSubject());
System.out.println("From: " + message.getFrom());
System.out.println("Content: " + message.getContent());
}
}));
pipeline.run().waitUntilFinish();
messages.apply(ParDo.of(new DoFn<MailMessage, MailAttachment>() {
@ProcessElement
public void processElement(@Element MailMessage message, OutputReceiver<MailAttachment> out) {
List<MailAttachment> attachments = message.getAttachment();
if (attachments != null) {
for (MailAttachment attachment : attachments) {
out.output(attachment);
}
}
}
}));
Contributions are welcome! Please feel free to submit a Pull Request.