maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1package s3
  2
  3import (
  4	"context"
  5	"fmt"
  6	"io"
  7	"net/http"
  8
  9	"github.com/foxcpp/maddy/framework/config"
 10	"github.com/foxcpp/maddy/framework/log"
 11	"github.com/foxcpp/maddy/framework/module"
 12	"github.com/minio/minio-go/v7"
 13	"github.com/minio/minio-go/v7/pkg/credentials"
 14)
 15
 16const modName = "storage.blob.s3"
 17
 18const (
 19	credsTypeFileMinio = "file_minio"
 20	credsTypeFileAWS   = "file_aws"
 21	credsTypeAccessKey = "access_key"
 22	credsTypeIAM       = "iam"
 23	credsTypeDefault   = credsTypeAccessKey
 24)
 25
 26type Store struct {
 27	instName string
 28	log      log.Logger
 29
 30	endpoint string
 31	cl       *minio.Client
 32
 33	bucketName   string
 34	objectPrefix string
 35}
 36
 37func New(_, instName string, _, inlineArgs []string) (module.Module, error) {
 38	if len(inlineArgs) != 0 {
 39		return nil, fmt.Errorf("%s: expected 0 arguments", modName)
 40	}
 41
 42	return &Store{
 43		instName: instName,
 44		log:      log.Logger{Name: modName},
 45	}, nil
 46}
 47
 48func (s *Store) Init(cfg *config.Map) error {
 49	var (
 50		secure          bool
 51		accessKeyID     string
 52		secretAccessKey string
 53		credsType       string
 54		location        string
 55	)
 56	cfg.String("endpoint", false, true, "", &s.endpoint)
 57	cfg.Bool("secure", false, true, &secure)
 58	cfg.String("access_key", false, true, "", &accessKeyID)
 59	cfg.String("secret_key", false, true, "", &secretAccessKey)
 60	cfg.String("bucket", false, true, "", &s.bucketName)
 61	cfg.String("region", false, false, "", &location)
 62	cfg.String("object_prefix", false, false, "", &s.objectPrefix)
 63	cfg.String("creds", false, false, credsTypeDefault, &credsType)
 64
 65	if _, err := cfg.Process(); err != nil {
 66		return err
 67	}
 68	if s.endpoint == "" {
 69		return fmt.Errorf("%s: endpoint not set", modName)
 70	}
 71
 72	var creds *credentials.Credentials
 73
 74	switch credsType {
 75	case credsTypeFileMinio:
 76		creds = credentials.NewFileMinioClient("", "")
 77	case credsTypeFileAWS:
 78		creds = credentials.NewFileAWSCredentials("", "")
 79	case credsTypeIAM:
 80		creds = credentials.NewIAM("")
 81	case credsTypeAccessKey:
 82		creds = credentials.NewStaticV4(accessKeyID, secretAccessKey, "")
 83	default:
 84		creds = credentials.NewStaticV4(accessKeyID, secretAccessKey, "")
 85	}
 86
 87	cl, err := minio.New(s.endpoint, &minio.Options{
 88		Creds:  creds,
 89		Secure: secure,
 90		Region: location,
 91	})
 92	if err != nil {
 93		return fmt.Errorf("%s: %w", modName, err)
 94	}
 95
 96	s.cl = cl
 97	return nil
 98}
 99
100func (s *Store) Name() string {
101	return modName
102}
103
104func (s *Store) InstanceName() string {
105	return s.instName
106}
107
108type s3blob struct {
109	pw      *io.PipeWriter
110	didSync bool
111	errCh   chan error
112}
113
114func (b *s3blob) Sync() error {
115	// We do this in Sync instead of Close because
116	// backend may not actually check the error of Close.
117	// The problematic restriction is that Sync can now be called
118	// only once.
119	if b.didSync {
120		panic("storage.blob.s3: Sync called twice for a blob object")
121	}
122
123	b.pw.Close()
124	b.didSync = true
125	return <-b.errCh
126}
127
128func (b *s3blob) Write(p []byte) (n int, err error) {
129	return b.pw.Write(p)
130}
131
132func (b *s3blob) Close() error {
133	if !b.didSync {
134		if err := b.pw.CloseWithError(fmt.Errorf("storage.blob.s3: blob closed without Sync")); err != nil {
135			panic(err)
136		}
137	}
138	return nil
139}
140
141func (s *Store) Create(ctx context.Context, key string, blobSize int64) (module.Blob, error) {
142	pr, pw := io.Pipe()
143	errCh := make(chan error, 1)
144
145	go func() {
146		partSize := uint64(0)
147		if blobSize == module.UnknownBlobSize {
148			// Without this, minio-go will allocate 500 MiB buffer which
149			// is a little too much.
150			// https://github.com/minio/minio-go/issues/1478
151			partSize = 1 * 1024 * 1024 /* 1 MiB */
152		}
153		_, err := s.cl.PutObject(ctx, s.bucketName, s.objectPrefix+key, pr, blobSize, minio.PutObjectOptions{
154			PartSize: partSize,
155		})
156		if err != nil {
157			if err := pr.CloseWithError(fmt.Errorf("s3 PutObject: %w", err)); err != nil {
158				panic(err)
159			}
160		}
161		errCh <- err
162	}()
163
164	return &s3blob{
165		pw:    pw,
166		errCh: errCh,
167	}, nil
168}
169
170func (s *Store) Open(ctx context.Context, key string) (io.ReadCloser, error) {
171	obj, err := s.cl.GetObject(ctx, s.bucketName, s.objectPrefix+key, minio.GetObjectOptions{})
172	if err != nil {
173		resp := minio.ToErrorResponse(err)
174		if resp.StatusCode == http.StatusNotFound {
175			return nil, module.ErrNoSuchBlob
176		}
177		return nil, err
178	}
179	return obj, nil
180}
181
182func (s *Store) Delete(ctx context.Context, keys []string) error {
183	var lastErr error
184	for _, k := range keys {
185		lastErr = s.cl.RemoveObject(ctx, s.bucketName, s.objectPrefix+k, minio.RemoveObjectOptions{})
186		if lastErr != nil {
187			s.log.Error("failed to delete object", lastErr, s.objectPrefix+k)
188		}
189	}
190	return lastErr
191}
192
193func init() {
194	var _ module.BlobStore = &Store{}
195	module.Register(modName, New)
196}