1package s323import (4 "context"5 "fmt"6 "io"7 "net/http"89 "github.com/foxcpp/maddy/framework/config"10 "github.com/foxcpp/maddy/framework/log"11 "github.com/foxcpp/maddy/framework/module"12 "github.com/minio/minio-go/v7"13 "github.com/minio/minio-go/v7/pkg/credentials"14)1516const modName = "storage.blob.s3"1718const (19 credsTypeFileMinio = "file_minio"20 credsTypeFileAWS = "file_aws"21 credsTypeAccessKey = "access_key"22 credsTypeIAM = "iam"23 credsTypeDefault = credsTypeAccessKey24)2526type Store struct {27 instName string28 log log.Logger2930 endpoint string31 cl *minio.Client3233 bucketName string34 objectPrefix string35}3637func New(_, instName string, _, inlineArgs []string) (module.Module, error) {38 if len(inlineArgs) != 0 {39 return nil, fmt.Errorf("%s: expected 0 arguments", modName)40 }4142 return &Store{43 instName: instName,44 log: log.Logger{Name: modName},45 }, nil46}4748func (s *Store) Init(cfg *config.Map) error {49 var (50 secure bool51 accessKeyID string52 secretAccessKey string53 credsType string54 location string55 )56 cfg.String("endpoint", false, true, "", &s.endpoint)57 cfg.Bool("secure", false, true, &secure)58 cfg.String("access_key", false, true, "", &accessKeyID)59 cfg.String("secret_key", false, true, "", &secretAccessKey)60 cfg.String("bucket", false, true, "", &s.bucketName)61 cfg.String("region", false, false, "", &location)62 cfg.String("object_prefix", false, false, "", &s.objectPrefix)63 cfg.String("creds", false, false, credsTypeDefault, &credsType)6465 if _, err := cfg.Process(); err != nil {66 return err67 }68 if s.endpoint == "" {69 return fmt.Errorf("%s: endpoint not set", modName)70 }7172 var creds *credentials.Credentials7374 switch credsType {75 case credsTypeFileMinio:76 creds = credentials.NewFileMinioClient("", "")77 case credsTypeFileAWS:78 creds = credentials.NewFileAWSCredentials("", "")79 case credsTypeIAM:80 creds = credentials.NewIAM("")81 case credsTypeAccessKey:82 creds = credentials.NewStaticV4(accessKeyID, secretAccessKey, "")83 default:84 creds = credentials.NewStaticV4(accessKeyID, secretAccessKey, "")85 }8687 cl, err := minio.New(s.endpoint, &minio.Options{88 Creds: creds,89 Secure: secure,90 Region: location,91 })92 if err != nil {93 return fmt.Errorf("%s: %w", modName, err)94 }9596 s.cl = cl97 return nil98}99100func (s *Store) Name() string {101 return modName102}103104func (s *Store) InstanceName() string {105 return s.instName106}107108type s3blob struct {109 pw *io.PipeWriter110 didSync bool111 errCh chan error112}113114func (b *s3blob) Sync() error {115 // We do this in Sync instead of Close because116 // backend may not actually check the error of Close.117 // The problematic restriction is that Sync can now be called118 // only once.119 if b.didSync {120 panic("storage.blob.s3: Sync called twice for a blob object")121 }122123 b.pw.Close()124 b.didSync = true125 return <-b.errCh126}127128func (b *s3blob) Write(p []byte) (n int, err error) {129 return b.pw.Write(p)130}131132func (b *s3blob) Close() error {133 if !b.didSync {134 if err := b.pw.CloseWithError(fmt.Errorf("storage.blob.s3: blob closed without Sync")); err != nil {135 panic(err)136 }137 }138 return nil139}140141func (s *Store) Create(ctx context.Context, key string, blobSize int64) (module.Blob, error) {142 pr, pw := io.Pipe()143 errCh := make(chan error, 1)144145 go func() {146 partSize := uint64(0)147 if blobSize == module.UnknownBlobSize {148 // Without this, minio-go will allocate 500 MiB buffer which149 // is a little too much.150 // https://github.com/minio/minio-go/issues/1478151 partSize = 1 * 1024 * 1024 /* 1 MiB */152 }153 _, err := s.cl.PutObject(ctx, s.bucketName, s.objectPrefix+key, pr, blobSize, minio.PutObjectOptions{154 PartSize: partSize,155 })156 if err != nil {157 if err := pr.CloseWithError(fmt.Errorf("s3 PutObject: %w", err)); err != nil {158 panic(err)159 }160 }161 errCh <- err162 }()163164 return &s3blob{165 pw: pw,166 errCh: errCh,167 }, nil168}169170func (s *Store) Open(ctx context.Context, key string) (io.ReadCloser, error) {171 obj, err := s.cl.GetObject(ctx, s.bucketName, s.objectPrefix+key, minio.GetObjectOptions{})172 if err != nil {173 resp := minio.ToErrorResponse(err)174 if resp.StatusCode == http.StatusNotFound {175 return nil, module.ErrNoSuchBlob176 }177 return nil, err178 }179 return obj, nil180}181182func (s *Store) Delete(ctx context.Context, keys []string) error {183 var lastErr error184 for _, k := range keys {185 lastErr = s.cl.RemoveObject(ctx, s.bucketName, s.objectPrefix+k, minio.RemoveObjectOptions{})186 if lastErr != nil {187 s.log.Error("failed to delete object", lastErr, s.objectPrefix+k)188 }189 }190 return lastErr191}192193func init() {194 var _ module.BlobStore = &Store{}195 module.Register(modName, New)196}