diff -ruN orig/Data/Conduit/TMChan.hs new/Data/Conduit/TMChan.hs --- orig/Data/Conduit/TMChan.hs 2014-08-27 18:36:44.141176333 +0300 +++ new/Data/Conduit/TMChan.hs 2014-08-27 18:36:43.000000000 +0300 @@ -63,22 +63,23 @@ import Control.Concurrent.STM.TMChan import Data.Conduit -import Data.Conduit.Internal (Pipe (..), ConduitM (..)) +import qualified Data.Conduit.List as CL -chanSource +chanSource :: MonadIO m => chan -- ^ The channel. -> (chan -> STM (Maybe a)) -- ^ The 'read' function. -> (chan -> STM ()) -- ^ The 'close' function. -> Source m a -chanSource ch reader closer = ConduitM src - where - src = PipeM pull - pull = do a <- liftSTM $ reader ch - case a of - Just x -> return $ HaveOutput src close x - Nothing -> return $ Done () - close = liftSTM $ closer ch +chanSource ch reader closer = + loop + where + loop = do + a <- liftSTM $ reader ch + case a of + Just x -> yieldOr x close >> loop + Nothing -> return () + close = liftSTM $ closer ch {-# INLINE chanSource #-} chanSink @@ -87,13 +88,9 @@ -> (chan -> a -> STM ()) -- ^ The 'write' function. -> (chan -> STM ()) -- ^ The 'close' function. -> Sink a m () -chanSink ch writer closer = ConduitM sink - where - sink = NeedInput push close - - push input = PipeM ((liftIO . atomically $ writer ch input) - >> (return $ NeedInput push close)) - close = const . liftSTM $ closer ch +chanSink ch writer closer = do + CL.mapM_ $ liftIO . atomically . writer ch + liftSTM $ closer ch {-# INLINE chanSink #-} -- | A simple wrapper around a TBMChan. As data is pushed into the channel, the diff -ruN orig/Data/Conduit/TQueue.hs new/Data/Conduit/TQueue.hs --- orig/Data/Conduit/TQueue.hs 2014-08-27 18:36:44.141176333 +0300 +++ new/Data/Conduit/TQueue.hs 2014-08-27 18:36:43.000000000 +0300 @@ -58,46 +58,28 @@ import Control.Monad import Control.Monad.IO.Class import Data.Conduit -import Data.Conduit.Internal +import qualified Data.Conduit.List as CL -- | A simple wrapper around a "TQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. sourceTQueue :: MonadIO m => TQueue a -> Source m a -sourceTQueue q = ConduitM src - where src = PipeM pull - pull = do x <- liftSTM $ readTQueue q - return $ HaveOutput src close x - close = return () +sourceTQueue q = forever $ liftSTM (readTQueue q) >>= yield -- | A simple wrapper around a "TQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTQueue :: MonadIO m => TQueue a -> Sink a m () -sinkTQueue q = ConduitM src - where src = sink - sink = NeedInput push close - push input = PipeM ((liftSTM $ writeTQueue q input) - >> (return $ NeedInput push close)) - close _ = return () +sinkTQueue q = CL.mapM_ (liftSTM . writeTQueue q) -- | A simple wrapper around a "TBQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. sourceTBQueue :: MonadIO m => TBQueue a -> Source m a -sourceTBQueue q = ConduitM src - where src = PipeM pull - pull = do x <- liftSTM $ readTBQueue q - return $ HaveOutput src close x - close = return () +sourceTBQueue q = forever $ liftSTM (readTBQueue q) >>= yield -- | A simple wrapper around a "TBQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. Boolean argument is used -- to specify if queue should be closed when the sink is closed. sinkTBQueue :: MonadIO m => TBQueue a -> Sink a m () -sinkTBQueue q = ConduitM src - where src = sink - sink = NeedInput push close - push input = PipeM ((liftSTM $ writeTBQueue q input) - >> (return $ NeedInput push close)) - close _ = return () +sinkTBQueue q = CL.mapM_ (liftSTM . writeTBQueue q) -- | A convenience wrapper for creating a source and sink TBQueue of the given -- size at once, without exposing the underlying queue. @@ -109,14 +91,15 @@ -- source will read it and pass it down the conduit pipeline. When the -- queue is closed, the source will close also. sourceTMQueue :: MonadIO m => TMQueue a -> Source m a -sourceTMQueue q = ConduitM src - where src = PipeM pull - pull = do mx <- liftSTM $ readTMQueue q - case mx of - Nothing -> return $ Done () - Just x -> return $ HaveOutput src close x - close = do liftSTM $ closeTMQueue q - return () +sourceTMQueue q = + loop + where + loop = do + mx <- liftSTM $ readTMQueue q + case mx of + Nothing -> return () + Just x -> yieldOr x close >> loop + close = liftSTM $ closeTMQueue q -- | A simple wrapper around a "TMQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. @@ -124,26 +107,23 @@ => TMQueue a -> Bool -- ^ Should the queue be closed when the sink is closed? -> Sink a m () -sinkTMQueue q shouldClose = ConduitM src - where src = sink - sink = NeedInput push close - push input = PipeM ((liftSTM $ writeTMQueue q input) - >> (return $ NeedInput push close)) - close _ = do when shouldClose (liftSTM $ closeTMQueue q) - return () +sinkTMQueue q shouldClose = do + CL.mapM_ (liftSTM . writeTMQueue q) + when shouldClose (liftSTM $ closeTMQueue q) -- | A simple wrapper around a "TBMQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. When the -- queue is closed, the source will close also. sourceTBMQueue :: MonadIO m => TBMQueue a -> Source m a -sourceTBMQueue q = ConduitM src - where src = PipeM pull - pull = do mx <- liftSTM $ readTBMQueue q - case mx of - Nothing -> return $ Done () - Just x -> return $ HaveOutput src close x - close = do liftSTM $ closeTBMQueue q - return () +sourceTBMQueue q = + loop + where + loop = do + mx <- liftSTM $ readTBMQueue q + case mx of + Nothing -> return () + Just x -> yieldOr x close >> loop + close = liftSTM $ closeTBMQueue q -- | A simple wrapper around a "TBMQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. @@ -151,13 +131,9 @@ => TBMQueue a -> Bool -- ^ Should the queue be closed when the sink is closed? -> Sink a m () -sinkTBMQueue q shouldClose = ConduitM src - where src = sink - sink = NeedInput push close - push input = PipeM ((liftSTM $ writeTBMQueue q input) - >> (return $ NeedInput push close)) - close _ = do when shouldClose (liftSTM $ closeTBMQueue q) - return () +sinkTBMQueue q shouldClose = do + CL.mapM_ (liftSTM . writeTBMQueue q) + when shouldClose (liftSTM $ closeTBMQueue q) liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a diff -ruN orig/stm-conduit.cabal new/stm-conduit.cabal --- orig/stm-conduit.cabal 2014-08-27 18:36:44.145176333 +0300 +++ new/stm-conduit.cabal 2014-08-27 18:36:43.000000000 +0300 @@ -29,7 +29,7 @@ , stm-chans >= 2.0 && < 3.1 , cereal >= 0.4.0.1 , cereal-conduit >= 0.7.2 - , conduit >= 1.0 && < 1.2 + , conduit >= 1.0 && < 1.3 , conduit-extra >= 1.0 && < 1.2 , directory >= 1.1 , resourcet >= 0.3 && < 1.2