feat(files): buffer uploads to minio
Missing: batch job to transfer uploads to database
This commit is contained in:
parent
353b7704dc
commit
d9e9179a52
@ -146,6 +146,17 @@ memcached:
|
||||
timeout: "_env:MEMCACHED_TIMEOUT:20"
|
||||
expiration: "_env:MEMCACHED_EXPIRATION:300"
|
||||
|
||||
upload-cache:
|
||||
host: "_env:UPLOAD_S3_HOST:"
|
||||
port: "_env:UPLOAD_S3_PORT:9000"
|
||||
access-key: "_env:UPLOAD_S3_KEY_ID:"
|
||||
secret-key: "_env:UPLOAD_S3_KEY"
|
||||
is-secure: "_env:UPLOAD_S3_SSL:false"
|
||||
region: "_env:UPLOAD_S3_REGION:"
|
||||
auto-discover-region: "_env:UPLOAD_S3_AUTO_DISCOVER_REGION:true"
|
||||
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
|
||||
upload-cache-bucket: "uni2work-uploads"
|
||||
|
||||
server-sessions:
|
||||
idle-timeout: 28807
|
||||
absolute-timeout: 604801
|
||||
|
||||
@ -149,6 +149,7 @@ dependencies:
|
||||
- clock
|
||||
- HsYAML
|
||||
- HsYAML-aeson
|
||||
- minio-hs
|
||||
|
||||
other-extensions:
|
||||
- GeneralizedNewtypeDeriving
|
||||
|
||||
@ -104,7 +104,7 @@ let
|
||||
set +xe
|
||||
fi
|
||||
|
||||
if [[ -z "$UPLOAD_S3_URL" ]]; then
|
||||
if [[ -z "$UPLOAD_S3_HOST" ]]; then
|
||||
set -xe
|
||||
|
||||
cleanup_minio() {
|
||||
@ -121,7 +121,7 @@ let
|
||||
|
||||
sleep 1
|
||||
|
||||
export UPLOAD_S3_URL=http://localhost:9000
|
||||
export UPLOAD_S3_HOST=localhost UPLOAD_S3_PORT=9000 UPLOAD_S3_SSL=false
|
||||
export UPLOAD_S3_KEY_ID=$(${pkgs.jq}/bin/jq -r '.credential.accessKey' ''${minio_dir}/.minio.sys/config/config.json)
|
||||
export UPLOAD_S3_KEY=$(${pkgs.jq}/bin/jq -r '.credential.secretKey' ''${minio_dir}/.minio.sys/config/config.json)
|
||||
|
||||
|
||||
@ -96,7 +96,9 @@ import qualified Data.Acid.Memory as Acid
|
||||
import qualified Web.ServerSession.Backend.Acid as Acid
|
||||
|
||||
import qualified Ldap.Client as Ldap (Host(Plain, Tls))
|
||||
|
||||
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
-- Import all relevant handler modules here.
|
||||
-- (HPack takes care to add new modules to our cabal file nowadays.)
|
||||
import Handler.News
|
||||
@ -176,7 +178,7 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
-- logging function. To get out of this loop, we initially create a
|
||||
-- temporary foundation without a real connection pool, get a log function
|
||||
-- from there, and then create the real foundation.
|
||||
let mkFoundation appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached = UniWorX {..}
|
||||
let mkFoundation appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache = UniWorX {..}
|
||||
-- The UniWorX {..} syntax is an example of record wild cards. For more
|
||||
-- information, see:
|
||||
-- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html
|
||||
@ -191,6 +193,7 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
(error "JSONWebKeySet forced in tempFoundation")
|
||||
(error "ClusterID forced in tempFoundation")
|
||||
(error "memcached forced in tempFoundation")
|
||||
(error "MinioConn forced in tempFoundation")
|
||||
|
||||
runAppLoggingT tempFoundation $ do
|
||||
$logInfoS "InstanceID" $ UUID.toText appInstanceID
|
||||
@ -240,7 +243,15 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
|
||||
appSessionStore <- mkSessionStore appSettings' sqlPool `runSqlPool` sqlPool
|
||||
|
||||
let foundation = mkFoundation sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached
|
||||
appUploadCache <- for appUploadCacheConf $ \minioConf -> liftIO $ do
|
||||
conn <- Minio.connect minioConf
|
||||
let isBucketExists Minio.BucketAlreadyOwnedByYou = True
|
||||
isBucketExists _ = False
|
||||
either throwM return <=< Minio.runMinioWith conn $
|
||||
handleIf isBucketExists (const $ return ()) $ Minio.makeBucket appUploadCacheBucket Nothing
|
||||
return conn
|
||||
|
||||
let foundation = mkFoundation sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache
|
||||
|
||||
-- Return the foundation
|
||||
$logDebugS "setup" "Done"
|
||||
|
||||
@ -18,6 +18,7 @@ import qualified Crypto.Saltine.Core.AEAD as AEAD
|
||||
import qualified Jose.Jwk as Jose
|
||||
|
||||
import qualified Database.Memcached.Binary.IO as Memcached
|
||||
import Network.Minio (MinioConn)
|
||||
|
||||
|
||||
type SMTPPool = Pool SMTPConnection
|
||||
@ -51,6 +52,7 @@ data UniWorX = UniWorX
|
||||
, appJSONWebKeySet :: Jose.JwkSet
|
||||
, appHealthReport :: TVar (Set (UTCTime, HealthReport))
|
||||
, appMemcached :: Maybe (AEAD.Key, Memcached.Connection)
|
||||
, appUploadCache :: Maybe MinioConn
|
||||
}
|
||||
|
||||
makeLenses_ ''UniWorX
|
||||
|
||||
@ -8,7 +8,13 @@ import Import
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
|
||||
|
||||
data SourceFilesException
|
||||
= SourceFilesMismatchedHashes
|
||||
| SourceFilesContentUnavailable
|
||||
@ -24,7 +30,17 @@ sourceFile FileReference{..} = do
|
||||
mFileContent <- traverse get $ FileContentKey <$> fileReferenceContent
|
||||
fileContent <- if
|
||||
| is (_Just . _Nothing) mFileContent
|
||||
-> throwM SourceFilesContentUnavailable
|
||||
, Just fileContentHash <- fileReferenceContent -- Not a restriction
|
||||
-> maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
fmap Just . (hoistMaybe =<<) . runAppMinio . runMaybeT $ do
|
||||
let isDoesNotExist :: HttpException -> Bool
|
||||
isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _))
|
||||
= responseStatus resp == notFound404
|
||||
isDoesNotExist _ = False
|
||||
objRes <- catchIfMaybeT isDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.fold
|
||||
| fmap (fmap fileContentHash) mFileContent /= fmap Just fileReferenceContent
|
||||
-> throwM SourceFilesMismatchedHashes
|
||||
| Just fileContent' <- fileContentContent <$> join mFileContent
|
||||
|
||||
19
src/Handler/Utils/Minio.hs
Normal file
19
src/Handler/Utils/Minio.hs
Normal file
@ -0,0 +1,19 @@
|
||||
module Handler.Utils.Minio
|
||||
( runAppMinio
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
import Foundation.Type
|
||||
|
||||
import Network.Minio (Minio)
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
|
||||
runAppMinio :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadPlus m
|
||||
)
|
||||
=> Minio a -> m a
|
||||
runAppMinio act = do
|
||||
conn <- hoistMaybe =<< getsYesod appUploadCache
|
||||
either throwM return <=< liftIO $ Minio.runMinioWith conn act
|
||||
@ -161,6 +161,7 @@ import Data.MonoTraversable.Instances as Import ()
|
||||
import Web.Cookie.Instances as Import ()
|
||||
import Network.HTTP.Types.Method.Instances as Import ()
|
||||
import Crypto.Random.Instances as Import ()
|
||||
import Network.Minio.Instances as Import ()
|
||||
|
||||
import Crypto.Hash as Import (Digest, SHA3_256, SHA3_512)
|
||||
import Crypto.Random as Import (ChaChaDRG, Seed)
|
||||
|
||||
24
src/Network/Minio/Instances.hs
Normal file
24
src/Network/Minio/Instances.hs
Normal file
@ -0,0 +1,24 @@
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Network.Minio.Instances
|
||||
(
|
||||
) where
|
||||
|
||||
import ClassyPrelude
|
||||
|
||||
import Network.Minio
|
||||
|
||||
import qualified UnliftIO.Exception as UnliftIO
|
||||
import Control.Monad.Catch
|
||||
|
||||
|
||||
instance MonadThrow Minio where
|
||||
throwM = UnliftIO.throwIO
|
||||
|
||||
instance MonadCatch Minio where
|
||||
catch = UnliftIO.catch
|
||||
|
||||
instance MonadMask Minio where
|
||||
mask = UnliftIO.mask
|
||||
uninterruptibleMask = UnliftIO.uninterruptibleMask
|
||||
generalBracket acq rel inner = withUnliftIO $ \UnliftIO{..} -> generalBracket (unliftIO acq) ((unliftIO .) . rel) $ unliftIO . inner
|
||||
@ -67,6 +67,8 @@ import Text.Show (showParen, showString)
|
||||
|
||||
import qualified Data.List.PointedList as P
|
||||
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
|
||||
-- | Runtime settings to configure this application. These settings can be
|
||||
-- loaded from various sources: defaults, environment variables, config files,
|
||||
@ -164,6 +166,9 @@ data AppSettings = AppSettings
|
||||
|
||||
, appMemcachedConf :: Maybe MemcachedConf
|
||||
|
||||
, appUploadCacheConf :: Maybe Minio.ConnectInfo
|
||||
, appUploadCacheBucket :: Minio.Bucket
|
||||
|
||||
, appFavouritesQuickActionsBurstsize
|
||||
, appFavouritesQuickActionsAvgInverseRate :: Word64
|
||||
, appFavouritesQuickActionsTimeout :: DiffTime
|
||||
@ -385,6 +390,20 @@ instance FromJSON JwtEncoding where
|
||||
return $ JweEncoding alg enc
|
||||
]
|
||||
|
||||
instance FromJSON Minio.ConnectInfo where
|
||||
parseJSON v@(String _) = fromString <$> parseJSON v
|
||||
parseJSON v = flip (withObject "ConnectInfo") v $ \o -> do
|
||||
connectHost <- o .:? "host" .!= ""
|
||||
connectPort <- o .: "port"
|
||||
connectAccessKey <- o .:? "access-key" .!= ""
|
||||
connectSecretKey <- o .:? "secret-key" .!= ""
|
||||
connectIsSecure <- o .: "is-secure"
|
||||
connectRegion <- o .:? "region" .!= ""
|
||||
connectAutoDiscoverRegion <- o .:? "auto-discover-region" .!= True
|
||||
connectDisableTLSCertValidation <- o .:? "disable-cert-validation" .!= False
|
||||
return Minio.ConnectInfo{..}
|
||||
|
||||
|
||||
instance FromJSON ServerSessionSettings where
|
||||
parseJSON = withObject "ServerSession.State" $ \o -> do
|
||||
idleTimeout <- o .:? "idle-timeout"
|
||||
@ -517,6 +536,9 @@ instance FromJSON AppSettings where
|
||||
appFavouritesQuickActionsTimeout <- o .: "favourites-quick-actions-timeout"
|
||||
appFavouritesQuickActionsCacheTTL <- o .: "favourites-quick-actions-cache-ttl"
|
||||
|
||||
appUploadCacheConf <- assertM (not . null . Minio.connectHost) <$> o .:? "upload-cache"
|
||||
appUploadCacheBucket <- o .: "upload-cache-bucket"
|
||||
|
||||
return AppSettings{..}
|
||||
|
||||
makeClassy_ ''AppSettings
|
||||
|
||||
@ -4,33 +4,61 @@ module Utils.Files
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
import Foundation.Type
|
||||
import Handler.Utils.Minio
|
||||
import qualified Network.Minio as Minio
|
||||
|
||||
import qualified Crypto.Hash as Crypto (hash)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import qualified Data.ByteString.Base64.URL as Base64
|
||||
import qualified Data.ByteArray as ByteArray
|
||||
|
||||
sinkFiles :: MonadIO m => ConduitT File FileReference (SqlPersistT m) ()
|
||||
|
||||
sinkFiles :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => ConduitT File FileReference (SqlPersistT m) ()
|
||||
sinkFiles = C.mapM sinkFile
|
||||
|
||||
sinkFile :: MonadIO m => File -> SqlPersistT m FileReference
|
||||
sinkFile File{..} = do
|
||||
fileReferenceContent <- for fileContent $ \fileContentContent -> do
|
||||
let fileContentHash = Crypto.hash fileContentContent
|
||||
unlessM (exists [ FileContentHash ==. fileContentHash ]) $
|
||||
repsert (FileContentKey fileContentHash) FileContent{..}
|
||||
return fileContentHash
|
||||
sinkFile :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX) => File -> SqlPersistT m FileReference
|
||||
sinkFile File{ fileContent = Nothing, .. } = return FileReference
|
||||
{ fileReferenceContent = Nothing
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
inDB <- exists [ FileContentHash ==. fileContentHash ]
|
||||
|
||||
let sinkFileDB = unless inDB $ repsert (FileContentKey fileContentHash) FileContent{..}
|
||||
maybeT sinkFileDB $ do
|
||||
let uploadName = decodeUtf8 . Base64.encodeUnpadded $ ByteArray.convert fileContentHash
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
unless inDB . runAppMinio $ do
|
||||
let isDoesNotExist :: HttpException -> Bool
|
||||
isDoesNotExist (HttpExceptionRequest _ (StatusCodeException resp _))
|
||||
= responseStatus resp == notFound404
|
||||
isDoesNotExist _ = False
|
||||
uploadExists <- handleIf isDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $ do
|
||||
let
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.sourceLazy $ fromStrict fileContentContent) (Just . fromIntegral $ olength fileContentContent) pooOptions
|
||||
-- Note that MinIO does not accept length zero uploads without an explicit length specification (not `Nothing` in the line above for the api we use)
|
||||
|
||||
return FileReference
|
||||
{ fileReferenceContent
|
||||
{ fileReferenceContent = Just fileContentHash
|
||||
, fileReferenceTitle = fileTitle
|
||||
, fileReferenceModified = fileModified
|
||||
}
|
||||
|
||||
sinkFiles' :: (MonadIO m, HasFileReference record) => ConduitT (File, FileReferenceResidual record) record (SqlPersistT m) ()
|
||||
where
|
||||
fileContentHash = Crypto.hash fileContentContent
|
||||
|
||||
|
||||
sinkFiles' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, HasFileReference record) => ConduitT (File, FileReferenceResidual record) record (SqlPersistT m) ()
|
||||
sinkFiles' = C.mapM $ uncurry sinkFile'
|
||||
|
||||
sinkFile' :: (MonadIO m, HasFileReference record) => File -> FileReferenceResidual record -> SqlPersistT m record
|
||||
sinkFile' :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, HasFileReference record) => File -> FileReferenceResidual record -> SqlPersistT m record
|
||||
sinkFile' file residual = do
|
||||
reference <- sinkFile file
|
||||
return $ _FileReference # (reference, residual)
|
||||
|
||||
@ -34,6 +34,8 @@ extra-deps:
|
||||
commit: f8170266ab25b533576e96715bedffc5aa4f19fa
|
||||
subdirs:
|
||||
- colonnade
|
||||
- git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git
|
||||
commit: 9a4e3889a93cf71d6bbf45b673f6353b39f15d9f
|
||||
|
||||
|
||||
# - colonnade-1.2.0.2
|
||||
@ -116,5 +118,7 @@ extra-deps:
|
||||
|
||||
- unordered-containers-0.2.11.0
|
||||
|
||||
- base64-bytestring-1.1.0.0
|
||||
|
||||
resolver: lts-15.12
|
||||
allow-newer: true
|
||||
|
||||
@ -150,6 +150,20 @@ packages:
|
||||
subdir: colonnade
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/colonnade.git
|
||||
commit: f8170266ab25b533576e96715bedffc5aa4f19fa
|
||||
- completed:
|
||||
cabal-file:
|
||||
size: 9845
|
||||
sha256: 674630347209bc5f7984e8e9d93293510489921f2d2d6092ad1c9b8c61b6560a
|
||||
name: minio-hs
|
||||
version: 1.5.2
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git
|
||||
pantry-tree:
|
||||
size: 4517
|
||||
sha256: ef7c5960da571c6cb41337b0bd30740bac92b4781b375be704093fdadd17330d
|
||||
commit: 9a4e3889a93cf71d6bbf45b673f6353b39f15d9f
|
||||
original:
|
||||
git: git@gitlab2.rz.ifi.lmu.de:uni2work/minio-hs.git
|
||||
commit: 9a4e3889a93cf71d6bbf45b673f6353b39f15d9f
|
||||
- completed:
|
||||
hackage: hsass-0.8.0@sha256:82d55fb2a10342accbc4fe80d263163f40a138d8636e275aa31ffa81b14abf01,2792
|
||||
pantry-tree:
|
||||
@ -325,6 +339,13 @@ packages:
|
||||
sha256: d9b83f62373f509a441223f22f12e22e39b38ef3275dfca7c190a4795bebfed5
|
||||
original:
|
||||
hackage: unordered-containers-0.2.11.0
|
||||
- completed:
|
||||
hackage: base64-bytestring-1.1.0.0@sha256:190264fef9e65d9085f00ccda419137096d1dc94777c58272bc96821dc7f37c3,2334
|
||||
pantry-tree:
|
||||
size: 850
|
||||
sha256: 9ade5b5911df97c37b249b84f123297049f19578cae171c647bf47683633427c
|
||||
original:
|
||||
hackage: base64-bytestring-1.1.0.0
|
||||
snapshots:
|
||||
- completed:
|
||||
size: 494635
|
||||
|
||||
Loading…
Reference in New Issue
Block a user