chore(files): test roundtripping through minio & db
This commit is contained in:
parent
2b5d01b14a
commit
28e93c8fec
@ -1,5 +1,6 @@
|
||||
database:
|
||||
database: "_env:PGDATABASE_TEST:uniworx_test"
|
||||
upload-cache-bucket: "uni2work-test-uploads"
|
||||
|
||||
log-settings:
|
||||
detailed: true
|
||||
|
||||
@ -311,6 +311,7 @@ tests:
|
||||
- generic-arbitrary
|
||||
- http-types
|
||||
- yesod-persistent
|
||||
- quickcheck-io
|
||||
ghc-options:
|
||||
- -fno-warn-orphans
|
||||
- -threaded -rtsopts "-with-rtsopts=-N -T"
|
||||
|
||||
@ -2,7 +2,7 @@ module Handler.Utils.Files
|
||||
( sourceFile, sourceFile'
|
||||
, sourceFiles, sourceFiles'
|
||||
, SourceFilesException(..)
|
||||
, sourceFileDB
|
||||
, sourceFileDB, sourceFileMinio
|
||||
, acceptFile
|
||||
) where
|
||||
|
||||
@ -50,6 +50,27 @@ sourceFileDB fileReference = do
|
||||
chunkHashes .| C.map E.unValue .| awaitForever (\chunkHash -> C.unfoldM (retrieveChunk chunkHash) $ Just (1 :: Int))
|
||||
|
||||
|
||||
sourceFileMinio :: (MonadThrow m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m)
|
||||
=> FileContentReference -> ConduitT () ByteString m ()
|
||||
sourceFileMinio fileReference = do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
minioAsync <- lift . allocateLinkedAsync $
|
||||
maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = minioFileReference # fileReference
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
hoistMaybe <=< runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar)
|
||||
let go = do
|
||||
mChunk <- atomically $ Right <$> takeTMVar chunkVar
|
||||
<|> Left <$> waitCatchSTM minioAsync
|
||||
case mChunk of
|
||||
Right chunk -> yield chunk >> go
|
||||
Left (Right ()) -> return ()
|
||||
Left (Left exc) -> throwM exc
|
||||
in go
|
||||
|
||||
|
||||
sourceFiles :: Monad m => ConduitT FileReference DBFile m ()
|
||||
sourceFiles = C.map sourceFile
|
||||
|
||||
@ -65,25 +86,7 @@ sourceFile FileReference{..} = File
|
||||
= return ()
|
||||
toFileContent fileReference = do
|
||||
inDB <- lift . E.selectExists . E.from $ \fileContentEntry -> E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||
if
|
||||
| inDB -> sourceFileDB fileReference
|
||||
| otherwise -> do
|
||||
chunkVar <- newEmptyTMVarIO
|
||||
minioAsync <- lift . allocateLinkedAsync $
|
||||
maybeT (throwM SourceFilesContentUnavailable) $ do
|
||||
let uploadName = minioFileReference # fileReference
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
hoistMaybe <=< runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket uploadName Minio.defaultGetObjectOptions
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| C.mapM_ (atomically . putTMVar chunkVar)
|
||||
let go = do
|
||||
mChunk <- atomically $ Right <$> takeTMVar chunkVar
|
||||
<|> Left <$> waitCatchSTM minioAsync
|
||||
case mChunk of
|
||||
Right chunk -> yield chunk >> go
|
||||
Left (Right ()) -> return ()
|
||||
Left (Left exc) -> throwM exc
|
||||
in go
|
||||
bool sourceFileMinio sourceFileDB inDB fileReference
|
||||
|
||||
sourceFiles' :: forall file m. (HasFileReference file, Monad m) => ConduitT file DBFile m ()
|
||||
sourceFiles' = C.map sourceFile'
|
||||
|
||||
@ -3,7 +3,7 @@ module Utils.Files
|
||||
, sinkFile', sinkFiles'
|
||||
, FileUploads
|
||||
, replaceFileReferences, replaceFileReferences'
|
||||
, sinkFileDB
|
||||
, sinkFileDB, sinkFileMinio
|
||||
) where
|
||||
|
||||
import Import.NoFoundation
|
||||
@ -85,6 +85,48 @@ sinkFileDB doReplace fileContentContent = do
|
||||
return fileContentHash
|
||||
where fileContentChunkContentBased = True
|
||||
|
||||
|
||||
sinkFileMinio :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m)
|
||||
=> ConduitT () ByteString m ()
|
||||
-> MaybeT m FileContentReference
|
||||
-- ^ Cannot deal with zero length uploads
|
||||
sinkFileMinio fileContentContent = do
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
chunk <- liftIO newEmptyMVar
|
||||
let putChunks = do
|
||||
nextChunk <- await
|
||||
case nextChunk of
|
||||
Nothing
|
||||
-> putMVar chunk Nothing
|
||||
Just nextChunk'
|
||||
-> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks
|
||||
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
||||
$ fileContentContent
|
||||
.| putChunks
|
||||
.| Crypto.sinkHash
|
||||
|
||||
runAppMinio $ do
|
||||
tmpUUID <- liftIO UUID.nextRandom
|
||||
let uploadName = ".tmp." <> toPathPiece tmpUUID
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions
|
||||
fileContentHash <- review _Wrapped <$> waitAsync sinkAsync
|
||||
let dstName = minioFileReference # fileContentHash
|
||||
copySrc = Minio.defaultSourceInfo
|
||||
{ Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName
|
||||
}
|
||||
copyDst = Minio.defaultDestinationInfo
|
||||
{ Minio.dstBucket = uploadBucket
|
||||
, Minio.dstObject = dstName
|
||||
}
|
||||
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $
|
||||
Minio.copyObject copyDst copySrc
|
||||
Minio.removeObject uploadBucket uploadName
|
||||
return fileContentHash
|
||||
|
||||
|
||||
sinkFiles :: (MonadCatch m, MonadHandler m, HandlerSite m ~ UniWorX, MonadUnliftIO m, BackendCompatible SqlBackend (YesodPersistBackend UniWorX), YesodPersist UniWorX) => ConduitT (File (SqlPersistT m)) FileReference (SqlPersistT m) ()
|
||||
sinkFiles = C.mapM sinkFile
|
||||
@ -99,43 +141,8 @@ sinkFile File{ fileContent = Just fileContentContent, .. } = do
|
||||
(unsealConduitT -> fileContentContent', isEmpty) <- fileContentContent $$+ is _Nothing <$> C.peekE
|
||||
|
||||
fileContentHash <- if
|
||||
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ do
|
||||
uploadBucket <- getsYesod $ views appSettings appUploadCacheBucket
|
||||
chunk <- liftIO newEmptyMVar
|
||||
let putChunks = do
|
||||
nextChunk <- await
|
||||
case nextChunk of
|
||||
Nothing
|
||||
-> putMVar chunk Nothing
|
||||
Just nextChunk'
|
||||
-> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks
|
||||
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
||||
$ fileContentContent'
|
||||
.| putChunks
|
||||
.| Crypto.sinkHash
|
||||
|
||||
runAppMinio $ do
|
||||
tmpUUID <- liftIO UUID.nextRandom
|
||||
let uploadName = ".tmp." <> toPathPiece tmpUUID
|
||||
pooOptions = Minio.defaultPutObjectOptions
|
||||
{ Minio.pooCacheControl = Just "immutable"
|
||||
}
|
||||
Minio.putObject uploadBucket uploadName (C.unfoldM (\x -> fmap (, x) <$> takeMVar chunk) ()) Nothing pooOptions
|
||||
fileContentHash <- review _Wrapped <$> waitAsync sinkAsync
|
||||
let dstName = minioFileReference # fileContentHash
|
||||
copySrc = Minio.defaultSourceInfo
|
||||
{ Minio.srcBucket = uploadBucket, Minio.srcObject = uploadName
|
||||
}
|
||||
copyDst = Minio.defaultDestinationInfo
|
||||
{ Minio.dstBucket = uploadBucket
|
||||
, Minio.dstObject = dstName
|
||||
}
|
||||
uploadExists <- handleIf minioIsDoesNotExist (const $ return False) $ True <$ Minio.statObject uploadBucket dstName Minio.defaultGetObjectOptions
|
||||
unless uploadExists $
|
||||
Minio.copyObject copyDst copySrc
|
||||
Minio.removeObject uploadBucket uploadName
|
||||
return fileContentHash
|
||||
| otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash))
|
||||
| not isEmpty -> maybeT (sinkFileDB False fileContentContent') $ sinkFileMinio fileContentContent'
|
||||
| otherwise -> return $$(liftTyped $ FileContentReference $$(emptyHash))
|
||||
|
||||
return FileReference
|
||||
{ fileReferenceContent = Just fileContentHash
|
||||
|
||||
69
test/Handler/Utils/FilesSpec.hs
Normal file
69
test/Handler/Utils/FilesSpec.hs
Normal file
@ -0,0 +1,69 @@
|
||||
module Handler.Utils.FilesSpec where
|
||||
|
||||
import TestImport hiding (runDB)
|
||||
import Yesod.Core.Handler (getsYesod)
|
||||
import Yesod.Persist.Core (YesodPersist(runDB))
|
||||
|
||||
import ModelSpec ()
|
||||
|
||||
import Handler.Utils.Files
|
||||
import Utils.Files
|
||||
|
||||
import Control.Lens.Extras (is)
|
||||
|
||||
import Data.Conduit
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Control.Monad.Fail (MonadFail(..))
|
||||
import Control.Monad.Trans.Maybe (runMaybeT)
|
||||
|
||||
import Utils.Sql (setSerializable)
|
||||
|
||||
|
||||
spec :: Spec
|
||||
spec = withApp . describe "File handling" $ do
|
||||
describe "Minio" $ do
|
||||
modifyMaxSuccess (`div` 10) . it "roundtrips" $ \(tSite, _) -> property $ do
|
||||
fileContentContent <- arbitrary
|
||||
`suchThat` (\fc -> not . null . runIdentity . runConduit $ fc .| C.sinkLazy) -- Minio (and by extension `sinkFileMinio`) cannot deal with zero length uploads; this is handled seperately by `sinkFile`
|
||||
file' <- arbitrary :: Gen PureFile
|
||||
let file = file' { fileContent = Just fileContentContent }
|
||||
return . propertyIO . unsafeHandler tSite $ do
|
||||
haveUploadCache <- getsYesod $ is _Just . appUploadCache
|
||||
unless haveUploadCache . liftIO $
|
||||
pendingWith "No upload cache (minio) configured"
|
||||
|
||||
let fRef = file ^. _FileReference . _1 . _fileReferenceContent
|
||||
fRef' <- runMaybeT . sinkFileMinio $ transPipe generalize fileContentContent
|
||||
|
||||
liftIO $ fRef' `shouldBe` fRef
|
||||
|
||||
fileReferenceContentContent <- liftIO . maybe (fail "Produced no file reference") return $ fRef'
|
||||
|
||||
suppliedContent <- runConduit $ transPipe generalize fileContentContent
|
||||
.| C.sinkLazy
|
||||
readContent <- runConduit $ sourceFileMinio fileReferenceContentContent
|
||||
.| C.takeE (succ $ olength suppliedContent)
|
||||
.| C.sinkLazy
|
||||
|
||||
liftIO $ readContent `shouldBe` suppliedContent
|
||||
describe "DB" $ do
|
||||
modifyMaxSuccess (`div` 10) . it "roundtrips" $ \(tSite, _) -> property $ do
|
||||
fileContentContent <- arbitrary
|
||||
`suchThat` (\fc -> not . null . runIdentity . runConduit $ fc .| C.sinkLazy) -- Minio (and by extension `sinkFileMinio`) cannot deal with zero length uploads; this is handled seperately by `sinkFile`
|
||||
file' <- arbitrary :: Gen PureFile
|
||||
let file = file' { fileContent = Just fileContentContent }
|
||||
return . propertyIO . unsafeHandler tSite $ do
|
||||
let fRef = file ^. _FileReference . _1 . _fileReferenceContent
|
||||
fRef' <- runDB . setSerializable . sinkFileDB True $ transPipe generalize fileContentContent
|
||||
|
||||
liftIO $ Just fRef' `shouldBe` fRef
|
||||
|
||||
suppliedContent <- runConduit $ transPipe generalize fileContentContent
|
||||
.| C.sinkLazy
|
||||
readContent <- runDB . runConduit
|
||||
$ sourceFileDB fRef'
|
||||
.| C.takeE (succ $ olength suppliedContent)
|
||||
.| C.sinkLazy
|
||||
|
||||
liftIO $ readContent `shouldBe` suppliedContent
|
||||
@ -36,8 +36,7 @@ spec = describe "Zip file handling" $ do
|
||||
describe "has compatible encoding to, and decoding from zip files" . forM_ universeF $ \strategy ->
|
||||
modifyMaxSuccess (bool id (37 *) $ strategy == (ZipProduceInterleaved, ZipConsumeInterleaved)) . it (show strategy) . property $ do
|
||||
zipFiles <- listOf arbitrary :: Gen [PureFile]
|
||||
return . property $ do
|
||||
|
||||
return . propertyIO $ do
|
||||
let zipProduceBuffered
|
||||
= evaluate . force <=< runConduitRes $ zipProduceInterleaved .| C.sinkLazy
|
||||
zipProduceInterleaved
|
||||
|
||||
@ -34,6 +34,7 @@ import Control.Monad.Catch.Pure (Catch, runCatch)
|
||||
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
import Data.Conduit
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
import Data.Ratio ((%))
|
||||
@ -141,6 +142,9 @@ instance Arbitrary User where
|
||||
return User{..}
|
||||
shrink = genericShrink
|
||||
|
||||
instance (LazySequence lazy strict, Arbitrary lazy, Monad m) => Arbitrary (ConduitT () strict m ()) where
|
||||
arbitrary = C.sourceLazy <$> arbitrary
|
||||
|
||||
scaleRatio :: Rational -> Int -> Int
|
||||
scaleRatio r = ceiling . (* r) . fromIntegral
|
||||
|
||||
@ -151,7 +155,7 @@ instance Monad m => Arbitrary (File m) where
|
||||
fileModified <- (addUTCTime <$> arbitrary <*> pure (UTCTime date 0)) `suchThat` inZipRange
|
||||
fileContent <- oneof
|
||||
[ pure Nothing
|
||||
, Just . C.sourceLazy <$> scale (scaleRatio $ 7 % 8) arbitrary
|
||||
, Just <$> scale (scaleRatio $ 7 % 8) arbitrary
|
||||
]
|
||||
return File{..}
|
||||
where
|
||||
|
||||
@ -36,6 +36,7 @@ import Test.QuickCheck.Classes.HttpApiData as X
|
||||
import Test.QuickCheck.Classes.Universe as X
|
||||
import Test.QuickCheck.Classes.Binary as X
|
||||
import Test.QuickCheck.Classes.Csv as X
|
||||
import Test.QuickCheck.IO as X
|
||||
import Data.Proxy as X
|
||||
import Data.UUID as X (UUID)
|
||||
import System.IO as X (hPrint, hPutStrLn)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user