281 lines
6.7 KiB
Go
281 lines
6.7 KiB
Go
package email
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"crypto/tls"
|
||
"fmt"
|
||
"net"
|
||
"net/smtp"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"git.toowon.com/jimmy/go-common/config"
|
||
"git.toowon.com/jimmy/go-common/logger"
|
||
)
|
||
|
||
// Email 邮件发送器
|
||
type Email struct {
|
||
config *config.EmailConfig
|
||
|
||
async bool
|
||
queue chan emailTask
|
||
workers int
|
||
wg sync.WaitGroup
|
||
closed bool
|
||
mu sync.Mutex
|
||
dropped atomic.Uint64
|
||
}
|
||
|
||
type emailTask struct {
|
||
to []string
|
||
subject string
|
||
body string
|
||
htmlBody string
|
||
requestID string
|
||
}
|
||
|
||
// NewEmail 创建邮件发送器
|
||
func NewEmail(cfg *config.Config) *Email {
|
||
if cfg == nil || cfg.Email == nil {
|
||
return &Email{config: nil}
|
||
}
|
||
e := &Email{
|
||
config: cfg.Email,
|
||
async: cfg.Email.IsAsync(),
|
||
workers: cfg.Email.Workers,
|
||
}
|
||
if e.workers <= 0 {
|
||
e.workers = 2
|
||
}
|
||
queueSize := cfg.Email.QueueSize
|
||
if queueSize <= 0 {
|
||
queueSize = 1000
|
||
}
|
||
if e.async {
|
||
e.queue = make(chan emailTask, queueSize)
|
||
for i := 0; i < e.workers; i++ {
|
||
e.wg.Add(1)
|
||
go e.worker()
|
||
}
|
||
}
|
||
return e
|
||
}
|
||
|
||
func (e *Email) worker() {
|
||
defer e.wg.Done()
|
||
for task := range e.queue {
|
||
if err := e.SendEmail(task.to, task.subject, task.body, task.htmlBody); err != nil {
|
||
fields := map[string]any{
|
||
"error": err.Error(),
|
||
"request_id": task.requestID,
|
||
"to": task.to,
|
||
}
|
||
logger.FromContext(context.Background()).Error("async email send failed", fields)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (e *Email) getEmailConfig() (*config.EmailConfig, error) {
|
||
if e.config == nil {
|
||
return nil, fmt.Errorf("email config is nil")
|
||
}
|
||
if e.config.Host == "" {
|
||
return nil, fmt.Errorf("email host is required")
|
||
}
|
||
if e.config.Username == "" {
|
||
return nil, fmt.Errorf("email username is required")
|
||
}
|
||
if e.config.Password == "" {
|
||
return nil, fmt.Errorf("email password is required")
|
||
}
|
||
if e.config.Port == 0 {
|
||
e.config.Port = 587
|
||
}
|
||
if e.config.From == "" {
|
||
e.config.From = e.config.Username
|
||
}
|
||
if e.config.Timeout == 0 {
|
||
e.config.Timeout = 5
|
||
}
|
||
return e.config, nil
|
||
}
|
||
|
||
// Message 邮件消息
|
||
type Message struct {
|
||
To []string
|
||
Cc []string
|
||
Bcc []string
|
||
Subject string
|
||
Body string
|
||
HTMLBody string
|
||
}
|
||
|
||
// SendEmail 同步发送邮件(验证码等需等待结果的场景)
|
||
func (e *Email) SendEmail(to []string, subject, body string, htmlBody ...string) error {
|
||
cfg, err := e.getEmailConfig()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
msg := &Message{To: to, Subject: subject, Body: body}
|
||
if len(htmlBody) > 0 && htmlBody[0] != "" {
|
||
msg.HTMLBody = htmlBody[0]
|
||
}
|
||
return e.send(msg, cfg)
|
||
}
|
||
|
||
// SendEmailAsync 异步发送邮件(HTTP 通知类场景)
|
||
func (e *Email) SendEmailAsync(ctx context.Context, to []string, subject, body string, htmlBody ...string) {
|
||
task := emailTask{
|
||
to: append([]string(nil), to...),
|
||
subject: subject,
|
||
body: body,
|
||
requestID: logger.RequestIDFromContext(ctx),
|
||
}
|
||
if len(htmlBody) > 0 {
|
||
task.htmlBody = htmlBody[0]
|
||
}
|
||
if !e.async {
|
||
_ = e.SendEmail(task.to, task.subject, task.body, task.htmlBody)
|
||
return
|
||
}
|
||
select {
|
||
case e.queue <- task:
|
||
default:
|
||
e.dropped.Add(1)
|
||
logger.FromContext(ctx).Error("email queue full, task dropped", map[string]any{
|
||
"to": to,
|
||
})
|
||
}
|
||
}
|
||
|
||
// Close 关闭异步 worker
|
||
func (e *Email) Close() error {
|
||
if !e.async {
|
||
return nil
|
||
}
|
||
e.mu.Lock()
|
||
if e.closed {
|
||
e.mu.Unlock()
|
||
return nil
|
||
}
|
||
e.closed = true
|
||
e.mu.Unlock()
|
||
close(e.queue)
|
||
e.wg.Wait()
|
||
return nil
|
||
}
|
||
|
||
func (e *Email) send(msg *Message, cfg *config.EmailConfig) error {
|
||
if msg == nil {
|
||
return fmt.Errorf("message is nil")
|
||
}
|
||
if len(msg.To) == 0 {
|
||
return fmt.Errorf("recipients are required")
|
||
}
|
||
if msg.Subject == "" {
|
||
return fmt.Errorf("subject is required")
|
||
}
|
||
if msg.Body == "" && msg.HTMLBody == "" {
|
||
return fmt.Errorf("body or HTMLBody is required")
|
||
}
|
||
|
||
emailBody, err := e.buildEmailBody(msg, cfg)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to build email body: %w", err)
|
||
}
|
||
|
||
recipients := append(msg.To, msg.Cc...)
|
||
recipients = append(recipients, msg.Bcc...)
|
||
|
||
addr := net.JoinHostPort(cfg.Host, fmt.Sprintf("%d", cfg.Port))
|
||
auth := smtp.PlainAuth("", cfg.Username, cfg.Password, cfg.Host)
|
||
|
||
conn, err := net.DialTimeout("tcp", addr, time.Duration(cfg.Timeout)*time.Second)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to connect to SMTP server: %w", err)
|
||
}
|
||
defer conn.Close()
|
||
|
||
client, err := smtp.NewClient(conn, cfg.Host)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to create SMTP client: %w", err)
|
||
}
|
||
defer client.Close()
|
||
|
||
if cfg.UseSSL || cfg.UseTLS {
|
||
tlsConfig := &tls.Config{ServerName: cfg.Host}
|
||
if err := client.StartTLS(tlsConfig); err != nil {
|
||
return fmt.Errorf("failed to start TLS: %w", err)
|
||
}
|
||
}
|
||
|
||
if err := client.Auth(auth); err != nil {
|
||
return fmt.Errorf("failed to authenticate: %w", err)
|
||
}
|
||
if err := client.Mail(cfg.From); err != nil {
|
||
return fmt.Errorf("failed to set sender: %w", err)
|
||
}
|
||
for _, to := range recipients {
|
||
if err := client.Rcpt(to); err != nil {
|
||
return fmt.Errorf("failed to set recipient %s: %w", to, err)
|
||
}
|
||
}
|
||
writer, err := client.Data()
|
||
if err != nil {
|
||
return fmt.Errorf("failed to get data writer: %w", err)
|
||
}
|
||
if _, err = writer.Write(emailBody); err != nil {
|
||
_ = writer.Close()
|
||
return fmt.Errorf("failed to write email body: %w", err)
|
||
}
|
||
if err = writer.Close(); err != nil {
|
||
return fmt.Errorf("failed to close writer: %w", err)
|
||
}
|
||
if err := client.Quit(); err != nil {
|
||
return fmt.Errorf("failed to quit: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (e *Email) buildEmailBody(msg *Message, cfg *config.EmailConfig) ([]byte, error) {
|
||
var buf bytes.Buffer
|
||
from := cfg.From
|
||
if cfg.FromName != "" {
|
||
from = fmt.Sprintf("%s <%s>", cfg.FromName, cfg.From)
|
||
}
|
||
buf.WriteString(fmt.Sprintf("From: %s\r\n", from))
|
||
buf.WriteString(fmt.Sprintf("To: %s\r\n", joinEmails(msg.To)))
|
||
if len(msg.Cc) > 0 {
|
||
buf.WriteString(fmt.Sprintf("Cc: %s\r\n", joinEmails(msg.Cc)))
|
||
}
|
||
buf.WriteString(fmt.Sprintf("Subject: %s\r\n", msg.Subject))
|
||
|
||
if msg.HTMLBody != "" {
|
||
boundary := "----=_Part_" + fmt.Sprint(time.Now().UnixNano())
|
||
buf.WriteString("MIME-Version: 1.0\r\n")
|
||
buf.WriteString(fmt.Sprintf("Content-Type: multipart/alternative; boundary=\"%s\"\r\n\r\n", boundary))
|
||
buf.WriteString("--" + boundary + "\r\nContent-Type: text/plain; charset=UTF-8\r\n\r\n")
|
||
buf.WriteString(msg.Body + "\r\n")
|
||
buf.WriteString("--" + boundary + "\r\nContent-Type: text/html; charset=UTF-8\r\n\r\n")
|
||
buf.WriteString(msg.HTMLBody + "\r\n")
|
||
buf.WriteString("--" + boundary + "--\r\n")
|
||
} else {
|
||
buf.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n")
|
||
buf.WriteString(msg.Body + "\r\n")
|
||
}
|
||
return buf.Bytes(), nil
|
||
}
|
||
|
||
func joinEmails(emails []string) string {
|
||
if len(emails) == 0 {
|
||
return ""
|
||
}
|
||
result := emails[0]
|
||
for i := 1; i < len(emails); i++ {
|
||
result += ", " + emails[i]
|
||
}
|
||
return result
|
||
}
|