-
Notifications
You must be signed in to change notification settings - Fork 1
2.1 RabbitMQ Integration Guide
Márcio Azevedo edited this page May 5, 2016
·
9 revisions
Source Code: FSharp.DataProcessingPipelines.RabbitMQ.
To start using (assuming you already have a RabbitMQ instance - if you don't, use, for example, CloudAMQP free plan or install your own instance) follow the next steps. This example assumes the following scenario:
-
Install FSharp.DataProcessingPipelines.RabbitMQ package and its dependencies (via NuGet or Paket)
-
Add a F# module to your F# project available gist:
open System open System.Collections.Generic open System.Globalization open System.Threading open RabbitMQ.Client open EasyNetQ open FSharp.DataProcessingPipelines.Core open FSharp.DataProcessingPipelines.Core.Messages open FSharp.DataProcessingPipelines.Core.Pipes open FSharp.DataProcessingPipelines.Core.Filters open FSharp.DataProcessingPipelines.Core.Runners open FSharp.DataProcessingPipelines.Infrastructure.RabbitMQ type BaseMessage (id:int, events:(DateTime * String) list) = member this.Id = id member this.Events = events type ServiceBInputPipe (serviceBus:IBus, subscriberId:String, topic:String, locale:String) = inherit RabbitMQInputPipe<BaseMessage> (serviceBus, subscriberId, topic, locale) type ServiceAOutputPipe (serviceBus:IBus, topic:String) = inherit RabbitMQOutputPipe<BaseMessage> (serviceBus, topic) type ServiceAFilter (pipe:ServiceAOutputPipe) = inherit DataSource<BaseMessage>(pipe) override this.Execute () = try try let msg = BaseMessage(1, [(DateTime.Now, "Test Message created in F# by the ServiceAFilter!")]) printfn "--- Service A Publishes Msg ---" this.OutputPipe.Publish msg finally // Dispose if needed () with | ex -> // log exception () type ServiceBFilter (pipe:ServiceBInputPipe) = inherit DataSink<BaseMessage>(pipe) override this.Execute () = let handler (msg:BaseMessage) = printfn "--- Service B Execute -> %d: " (msg.Id) for i in msg.Events do printfn "(%A, %s)" (fst i) (snd i) printfn "-----------------------------------------" this.InputPipe.Subscribe (handler) let ServiceBusHost = "host=localhost" //TODO: set an existing RabbitMQ host! let Culture = "en-US" let ServiceASubscriberId = "ServiceASubscriberId" let ServiceAInputPipeTopic = "ServiceAInputPipeTopic" let ServiceAOutputPipeTopic = "ServiceAOutputPipeTopic" let ServiceBSubscriberId = "ServiceBSubscriberId"; let ServiceBInputPipeTopic = "ServiceAOutputPipeTopic" let ServiceBOutputPipeTopic = "ServiceDInputPipeTopic" let serviceBus = try RabbitHutch.CreateBus(ServiceBusHost) with | ex -> let innerException = ex.InnerException printfn "%A %A" (ex.Message) (innerException) raise ex let outputPipe = new ServiceAOutputPipe(serviceBus, ServiceAInputPipeTopic) let inputPipe = new ServiceBInputPipe(serviceBus, ServiceBSubscriberId, ServiceAInputPipeTopic, Culture) let myRunnerA = BaseRunner (ServiceAFilter (outputPipe)) let myRunnerB = BaseRunner (ServiceBFilter (inputPipe)) myRunnerA.Start () myRunnerB.Start ()