mirror of
https://github.com/ZizzyDizzyMC/linx-server.git
synced 2026-01-23 02:14:33 +00:00
Add S3 backend (#156)
This commit is contained in:
parent
0fb5fa1c51
commit
5d9a93b1e2
21 changed files with 738 additions and 441 deletions
|
|
@ -1,63 +1,149 @@
|
|||
package localfs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/andreimarcu/linx-server/backends"
|
||||
"github.com/andreimarcu/linx-server/helpers"
|
||||
)
|
||||
|
||||
type LocalfsBackend struct {
|
||||
basePath string
|
||||
metaPath string
|
||||
filesPath string
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Delete(key string) error {
|
||||
return os.Remove(path.Join(b.basePath, key))
|
||||
type MetadataJSON struct {
|
||||
DeleteKey string `json:"delete_key"`
|
||||
Sha256sum string `json:"sha256sum"`
|
||||
Mimetype string `json:"mimetype"`
|
||||
Size int64 `json:"size"`
|
||||
Expiry int64 `json:"expiry"`
|
||||
ArchiveFiles []string `json:"archive_files,omitempty"`
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Delete(key string) (err error) {
|
||||
err = os.Remove(path.Join(b.filesPath, key))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = os.Remove(path.Join(b.metaPath, key))
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Exists(key string) (bool, error) {
|
||||
_, err := os.Stat(path.Join(b.basePath, key))
|
||||
_, err := os.Stat(path.Join(b.filesPath, key))
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Get(key string) ([]byte, error) {
|
||||
return ioutil.ReadFile(path.Join(b.basePath, key))
|
||||
func (b LocalfsBackend) Head(key string) (metadata backends.Metadata, err error) {
|
||||
f, err := os.Open(path.Join(b.metaPath, key))
|
||||
if os.IsNotExist(err) {
|
||||
return metadata, backends.NotFoundErr
|
||||
} else if err != nil {
|
||||
return metadata, backends.BadMetadata
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
decoder := json.NewDecoder(f)
|
||||
|
||||
mjson := MetadataJSON{}
|
||||
if err := decoder.Decode(&mjson); err != nil {
|
||||
return metadata, backends.BadMetadata
|
||||
}
|
||||
|
||||
metadata.DeleteKey = mjson.DeleteKey
|
||||
metadata.Mimetype = mjson.Mimetype
|
||||
metadata.ArchiveFiles = mjson.ArchiveFiles
|
||||
metadata.Sha256sum = mjson.Sha256sum
|
||||
metadata.Expiry = time.Unix(mjson.Expiry, 0)
|
||||
metadata.Size = mjson.Size
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Put(key string, r io.Reader) (int64, error) {
|
||||
dst, err := os.Create(path.Join(b.basePath, key))
|
||||
func (b LocalfsBackend) Get(key string) (metadata backends.Metadata, f io.ReadCloser, err error) {
|
||||
metadata, err = b.Head(key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return
|
||||
}
|
||||
|
||||
f, err = os.Open(path.Join(b.filesPath, key))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) writeMetadata(key string, metadata backends.Metadata) error {
|
||||
metaPath := path.Join(b.metaPath, key)
|
||||
|
||||
mjson := MetadataJSON{
|
||||
DeleteKey: metadata.DeleteKey,
|
||||
Mimetype: metadata.Mimetype,
|
||||
ArchiveFiles: metadata.ArchiveFiles,
|
||||
Sha256sum: metadata.Sha256sum,
|
||||
Expiry: metadata.Expiry.Unix(),
|
||||
Size: metadata.Size,
|
||||
}
|
||||
|
||||
dst, err := os.Create(metaPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dst.Close()
|
||||
|
||||
encoder := json.NewEncoder(dst)
|
||||
err = encoder.Encode(mjson)
|
||||
if err != nil {
|
||||
os.Remove(metaPath)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Put(key string, r io.Reader, expiry time.Time, deleteKey string) (m backends.Metadata, err error) {
|
||||
filePath := path.Join(b.filesPath, key)
|
||||
|
||||
dst, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer dst.Close()
|
||||
|
||||
bytes, err := io.Copy(dst, r)
|
||||
if bytes == 0 {
|
||||
b.Delete(key)
|
||||
return bytes, errors.New("Empty file")
|
||||
os.Remove(filePath)
|
||||
return m, backends.FileEmptyError
|
||||
} else if err != nil {
|
||||
b.Delete(key)
|
||||
return bytes, err
|
||||
os.Remove(filePath)
|
||||
return m, err
|
||||
}
|
||||
|
||||
return bytes, err
|
||||
}
|
||||
m.Expiry = expiry
|
||||
m.DeleteKey = deleteKey
|
||||
m.Size = bytes
|
||||
m.Mimetype, _ = helpers.DetectMime(dst)
|
||||
m.Sha256sum, _ = helpers.Sha256sum(dst)
|
||||
m.ArchiveFiles, _ = helpers.ListArchiveFiles(m.Mimetype, m.Size, dst)
|
||||
|
||||
func (b LocalfsBackend) Open(key string) (backends.ReadSeekCloser, error) {
|
||||
return os.Open(path.Join(b.basePath, key))
|
||||
}
|
||||
err = b.writeMetadata(key, m)
|
||||
if err != nil {
|
||||
os.Remove(filePath)
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) ServeFile(key string, w http.ResponseWriter, r *http.Request) {
|
||||
filePath := path.Join(b.basePath, key)
|
||||
http.ServeFile(w, r, filePath)
|
||||
return
|
||||
}
|
||||
|
||||
func (b LocalfsBackend) Size(key string) (int64, error) {
|
||||
fileInfo, err := os.Stat(path.Join(b.basePath, key))
|
||||
fileInfo, err := os.Stat(path.Join(b.filesPath, key))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
@ -68,7 +154,7 @@ func (b LocalfsBackend) Size(key string) (int64, error) {
|
|||
func (b LocalfsBackend) List() ([]string, error) {
|
||||
var output []string
|
||||
|
||||
files, err := ioutil.ReadDir(b.basePath)
|
||||
files, err := ioutil.ReadDir(b.filesPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -80,6 +166,9 @@ func (b LocalfsBackend) List() ([]string, error) {
|
|||
return output, nil
|
||||
}
|
||||
|
||||
func NewLocalfsBackend(basePath string) LocalfsBackend {
|
||||
return LocalfsBackend{basePath: basePath}
|
||||
func NewLocalfsBackend(metaPath string, filesPath string) LocalfsBackend {
|
||||
return LocalfsBackend{
|
||||
metaPath: metaPath,
|
||||
filesPath: filesPath,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,6 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
type MetaBackend interface {
|
||||
Get(key string) (Metadata, error)
|
||||
Put(key string, metadata *Metadata) error
|
||||
}
|
||||
|
||||
type Metadata struct {
|
||||
DeleteKey string
|
||||
Sha256sum string
|
||||
|
|
|
|||
|
|
@ -1,70 +0,0 @@
|
|||
package metajson
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/andreimarcu/linx-server/backends"
|
||||
)
|
||||
|
||||
type MetadataJSON struct {
|
||||
DeleteKey string `json:"delete_key"`
|
||||
Sha256sum string `json:"sha256sum"`
|
||||
Mimetype string `json:"mimetype"`
|
||||
Size int64 `json:"size"`
|
||||
Expiry int64 `json:"expiry"`
|
||||
ArchiveFiles []string `json:"archive_files,omitempty"`
|
||||
}
|
||||
|
||||
type MetaJSONBackend struct {
|
||||
storage backends.MetaStorageBackend
|
||||
}
|
||||
|
||||
func (m MetaJSONBackend) Put(key string, metadata *backends.Metadata) error {
|
||||
mjson := MetadataJSON{}
|
||||
mjson.DeleteKey = metadata.DeleteKey
|
||||
mjson.Mimetype = metadata.Mimetype
|
||||
mjson.ArchiveFiles = metadata.ArchiveFiles
|
||||
mjson.Sha256sum = metadata.Sha256sum
|
||||
mjson.Expiry = metadata.Expiry.Unix()
|
||||
mjson.Size = metadata.Size
|
||||
|
||||
byt, err := json.Marshal(mjson)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := m.storage.Put(key, bytes.NewBuffer(byt)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MetaJSONBackend) Get(key string) (metadata backends.Metadata, err error) {
|
||||
b, err := m.storage.Get(key)
|
||||
if err != nil {
|
||||
return metadata, backends.BadMetadata
|
||||
}
|
||||
|
||||
mjson := MetadataJSON{}
|
||||
|
||||
err = json.Unmarshal(b, &mjson)
|
||||
if err != nil {
|
||||
return metadata, backends.BadMetadata
|
||||
}
|
||||
|
||||
metadata.DeleteKey = mjson.DeleteKey
|
||||
metadata.Mimetype = mjson.Mimetype
|
||||
metadata.ArchiveFiles = mjson.ArchiveFiles
|
||||
metadata.Sha256sum = mjson.Sha256sum
|
||||
metadata.Expiry = time.Unix(mjson.Expiry, 0)
|
||||
metadata.Size = mjson.Size
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func NewMetaJSONBackend(storage backends.MetaStorageBackend) MetaJSONBackend {
|
||||
return MetaJSONBackend{storage: storage}
|
||||
}
|
||||
192
backends/s3/s3.go
Normal file
192
backends/s3/s3.go
Normal file
|
|
@ -0,0 +1,192 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/andreimarcu/linx-server/backends"
|
||||
"github.com/andreimarcu/linx-server/helpers"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
)
|
||||
|
||||
type S3Backend struct {
|
||||
bucket string
|
||||
svc *s3.S3
|
||||
}
|
||||
|
||||
func (b S3Backend) Delete(key string) error {
|
||||
_, err := b.svc.DeleteObject(&s3.DeleteObjectInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b S3Backend) Exists(key string) (bool, error) {
|
||||
_, err := b.svc.HeadObject(&s3.HeadObjectInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
func (b S3Backend) Head(key string) (metadata backends.Metadata, err error) {
|
||||
var result *s3.HeadObjectOutput
|
||||
result, err = b.svc.HeadObject(&s3.HeadObjectInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
if aerr, ok := err.(awserr.Error); ok {
|
||||
if aerr.Code() == s3.ErrCodeNoSuchKey || aerr.Code() == "NotFound" {
|
||||
err = backends.NotFoundErr
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
metadata, err = unmapMetadata(result.Metadata)
|
||||
return
|
||||
}
|
||||
|
||||
func (b S3Backend) Get(key string) (metadata backends.Metadata, r io.ReadCloser, err error) {
|
||||
var result *s3.GetObjectOutput
|
||||
result, err = b.svc.GetObject(&s3.GetObjectInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
if aerr, ok := err.(awserr.Error); ok {
|
||||
if aerr.Code() == s3.ErrCodeNoSuchKey || aerr.Code() == "NotFound" {
|
||||
err = backends.NotFoundErr
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
metadata, err = unmapMetadata(result.Metadata)
|
||||
r = result.Body
|
||||
return
|
||||
}
|
||||
|
||||
func mapMetadata(m backends.Metadata) map[string]*string {
|
||||
return map[string]*string{
|
||||
"Expiry": aws.String(strconv.FormatInt(m.Expiry.Unix(), 10)),
|
||||
"Delete_key": aws.String(m.DeleteKey),
|
||||
"Size": aws.String(strconv.FormatInt(m.Size, 10)),
|
||||
"Mimetype": aws.String(m.Mimetype),
|
||||
"Sha256sum": aws.String(m.Sha256sum),
|
||||
}
|
||||
}
|
||||
|
||||
func unmapMetadata(input map[string]*string) (m backends.Metadata, err error) {
|
||||
expiry, err := strconv.ParseInt(aws.StringValue(input["Expiry"]), 10, 64)
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
m.Expiry = time.Unix(expiry, 0)
|
||||
|
||||
m.Size, err = strconv.ParseInt(aws.StringValue(input["Size"]), 10, 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.DeleteKey = aws.StringValue(input["Delete_key"])
|
||||
m.Mimetype = aws.StringValue(input["Mimetype"])
|
||||
m.Sha256sum = aws.StringValue(input["Sha256sum"])
|
||||
return
|
||||
}
|
||||
|
||||
func (b S3Backend) Put(key string, r io.Reader, expiry time.Time, deleteKey string) (m backends.Metadata, err error) {
|
||||
tmpDst, err := ioutil.TempFile("", "linx-server-upload")
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
defer tmpDst.Close()
|
||||
defer os.Remove(tmpDst.Name())
|
||||
|
||||
bytes, err := io.Copy(tmpDst, r)
|
||||
if bytes == 0 {
|
||||
return m, backends.FileEmptyError
|
||||
} else if err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
||||
m.Expiry = expiry
|
||||
m.DeleteKey = deleteKey
|
||||
m.Size = bytes
|
||||
m.Mimetype, _ = helpers.DetectMime(tmpDst)
|
||||
m.Sha256sum, _ = helpers.Sha256sum(tmpDst)
|
||||
// XXX: we may not be able to write this to AWS easily
|
||||
//m.ArchiveFiles, _ = helpers.ListArchiveFiles(m.Mimetype, m.Size, tmpDst)
|
||||
|
||||
uploader := s3manager.NewUploaderWithClient(b.svc)
|
||||
input := &s3manager.UploadInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
Key: aws.String(key),
|
||||
Body: tmpDst,
|
||||
Metadata: mapMetadata(m),
|
||||
}
|
||||
_, err = uploader.Upload(input)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (b S3Backend) Size(key string) (int64, error) {
|
||||
input := &s3.HeadObjectInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
Key: aws.String(key),
|
||||
}
|
||||
result, err := b.svc.HeadObject(input)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return *result.ContentLength, nil
|
||||
}
|
||||
|
||||
func (b S3Backend) List() ([]string, error) {
|
||||
var output []string
|
||||
input := &s3.ListObjectsInput{
|
||||
Bucket: aws.String(b.bucket),
|
||||
}
|
||||
|
||||
results, err := b.svc.ListObjects(input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
for _, object := range results.Contents {
|
||||
output = append(output, *object.Key)
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func NewS3Backend(bucket string, region string, endpoint string) S3Backend {
|
||||
awsConfig := &aws.Config{}
|
||||
if region != "" {
|
||||
awsConfig.Region = aws.String(region)
|
||||
}
|
||||
if endpoint != "" {
|
||||
awsConfig.Endpoint = aws.String(endpoint)
|
||||
}
|
||||
|
||||
sess := session.Must(session.NewSession(awsConfig))
|
||||
svc := s3.New(sess)
|
||||
return S3Backend{bucket: bucket, svc: svc}
|
||||
}
|
||||
|
|
@ -1,24 +1,17 @@
|
|||
package backends
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ReadSeekCloser interface {
|
||||
io.Reader
|
||||
io.Closer
|
||||
io.Seeker
|
||||
io.ReaderAt
|
||||
}
|
||||
|
||||
type StorageBackend interface {
|
||||
Delete(key string) error
|
||||
Exists(key string) (bool, error)
|
||||
Get(key string) ([]byte, error)
|
||||
Put(key string, r io.Reader) (int64, error)
|
||||
Open(key string) (ReadSeekCloser, error)
|
||||
ServeFile(key string, w http.ResponseWriter, r *http.Request)
|
||||
Head(key string) (Metadata, error)
|
||||
Get(key string) (Metadata, io.ReadCloser, error)
|
||||
Put(key string, r io.Reader, expiry time.Time, deleteKey string) (Metadata, error)
|
||||
Size(key string) (int64, error)
|
||||
}
|
||||
|
||||
|
|
@ -26,3 +19,6 @@ type MetaStorageBackend interface {
|
|||
StorageBackend
|
||||
List() ([]string, error)
|
||||
}
|
||||
|
||||
var NotFoundErr = errors.New("File not found.")
|
||||
var FileEmptyError = errors.New("Empty file")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue