Objective
Apache Kafka represents a powerful tool for linking microservices in a distributed system with a focus on processing and producing streams of interesting data.
If you're new, check out a quick introduction to the important concepts in Apache Kafka.
RdKafka is a C-native library for interacting with Apache Kafka that is used in a wide variety of systems and a nice C# wrapper is available for it.
Let's look at how to use these from F# to support coordinated, high-performance microservices.
Dependencies
-
RdKafka from NuGet, or using Paket:
./.paket/paket.exe add nuget RdKafka project MyProject
-
Pre-build event to transfer native libraries (64-bit): (see: DllNotFoundException)
xcopy /y /d /f "$(ProjectDir)..\packages\RdKafka.Internal.librdkafka\runtimes\win7-x64\native\*.*" "$(TargetDir)"
Reference and open the C# wrapper:
#r "./packages/RdKafka/lib/net451/RdKafka.dll"
open RdKafka
Terminology
Kafka
type Topic = string // https://kafka.apache.org/intro#intro_topics
and Partition = int // partition of a topic
and Offset = int64 // offset position within a partition of a topic
and BrokerCsv = string // connection string of "broker1:9092,broker2:9092"
// with protocol {http://,tcp://} removed
and ConsumerName = string // https://kafka.apache.org/intro#intro_consumers
and ErrorReason = string
RdKafka Events
RdKafka provides much better transparency than other .NET kafka libraries by exposing a wide variety of events using callbacks. Here we define an F# union type for the various cases.
type Event =
| Statistics of string
| OffsetCommit of ErrorCode*List<Topic*Partition*Offset>
| EndReached of Topic*Partition*Offset
| PartitionsAssigned of List<Topic*Partition*Offset>
| PartitionsRevoked of List<Topic*Partition*Offset>
| ConsumerError of ErrorCode
| Error of ErrorCode*string
Converting RdKafka Types
Following are some simple conversions to F# datatypes that are used throughput our samples:
let ofTopicPartitionOffset (tpo:TopicPartitionOffset) =
tpo.Topic,tpo.Partition,tpo.Offset
let ofTopicPartitionOffsets =
Seq.map ofTopicPartitionOffset >> Seq.toList
let ofCommit (oca:Consumer.OffsetCommitArgs) =
oca.Error,ofTopicPartitionOffsets oca.Offsets
let ofError (ea:Handle.ErrorArgs) =
ea.ErrorCode,ea.Reason
Logging RdKafka Events
Consider using an exhaustive pattern match on the Event union type to provide highly granular logging. As you're working with the library, these cases will give you valuable insight:
let toLog = function
| Event.Error _ -> ()
//| ...
You can easily attach your toLog
function to callbacks from both the producer and consumer
by mapping the callbacks into cases of the Event
type:
let fromConsumerToLog (c:EventConsumer) =
c.OnStatistics.Add(Statistics >> toLog)
c.OnOffsetCommit.Add(ofCommit >> OffsetCommit >> toLog)
c.OnEndReached.Add(ofTopicPartitionOffset >> EndReached >> toLog)
c.OnPartitionsAssigned.Add(ofTopicPartitionOffsets >> PartitionsAssigned >> toLog)
c.OnPartitionsRevoked.Add(ofTopicPartitionOffsets >> PartitionsRevoked >> toLog)
c.OnConsumerError.Add(ConsumerError >> toLog)
c.OnError.Add(ofError >> Error >> toLog)
let fromProducerToLog (p:Producer) =
p.OnError.Add(ofError >> Error >> toLog)
p.OnStatistics.Add(Statistics >> toLog)
Configuration and Connection
When you connect to RdKafka, you can configure any of the defaults in the underlying native library. In particular, there are several settings you may want to consider:
-
a consumer GroupId, shared by all cooperating instances of a microservice
- note: for rdkafka 0.9.1 or earlier, setting GroupId on a producer may block Dispose()
- whether or not to EnableAutoCommit for tracking your current offset position
- whether to save the offsets on the broker for coordination
- if your Kafka cluster runs an idle connection reaper, disconnection messages will appear at even intervals when idle
- a metadata broker list workaround enables you to query additional metadata using the native wrapper
-
where to start a brand new consumer group:
smallest
starts processing from the earliest offsets in the topiclargest
, the default, starts from the newest message offsets
let connect (brokerCsv:BrokerCsv) (group:ConsumerName) (autoCommit:bool) =
let config = new Config()
config.GroupId <- group // (1)
config.StatisticsInterval <- TimeSpan.FromSeconds(1.0)
config.EnableAutoCommit <- autoCommit // (2)
config.["offset.store.method"] <- "broker" // (3)
config.["log.connection.close"] <- "false" // (4)
config.["metadata.broker.list"] <- brokerCsv // (5)
config.DefaultTopicConfig <-
let topicConfig = new TopicConfig()
topicConfig.["auto.offset.reset"] <- "smallest" // (6)
topicConfig
new EventConsumer(config, brokerCsv),
new Producer(config, brokerCsv)
Publishing
A partition key and payload can then be published to a topic. The response
includes a partition and offset position confirming the write. Consider
Encoding.UTF8.GetBytes
if your message is text.
let publish (brokerCsv:BrokerCsv) (group:ConsumerName) (topic:Topic) =
let consumer,producer = connect brokerCsv group false
let topic = producer.Topic(topic)
fun (key:byte[], payload:byte[]) -> async {
let! report = Async.AwaitTask(topic.Produce(payload=payload,key=key))
return report.Partition, report.Offset
}
Subscribing
To consume, on partition assignment we select Offset.Stored
, which defaults to the value of
auto.offset.reset
if no stored offset exists. Messages are then sent to the onMessage callback
once the topic subscription starts.
let subscribeCallback (brokerCsv) (group) (topic:Topic) (onMessage) =
let autoCommit = true
let consumer,producer = connect brokerCsv group autoCommit
consumer.OnPartitionsAssigned.Add(
ofTopicPartitionOffsets
>> List.map (fun (t,p,o) -> t,p,Offset.Stored)
>> List.map TopicPartitionOffset
>> Collections.Generic.List<_>
>> consumer.Assign)
consumer.OnMessage.Add(onMessage)
consumer.Subscribe(new Collections.Generic.List<string>([topic]))
consumer.Start()
The above works quite well assuming you process the message to completion within the callback.
You may want to process larger batches of messages asynchronously, however. To use a sequence instead, a blocking collection can buffer incoming messages as they're received. A sequence generator yielding messages from this buffer is returned to the client:
let subscribeSeq brokerCsv group topic : seq<Partition*Offset*byte[]> =
Buffering more than 3000 messages will hold+block the callback until the client has consumed from the sequence.
let buffer = 3000
let messages =
new Collections.Concurrent.BlockingCollection<Message>(
new Collections.Concurrent.ConcurrentQueue<Message>(), buffer)
consumer.OnMessage.Add(messages.Add)
consumer.Subscribe(new Collections.Generic.List<string>([topic]))
consumer.Start()
Seq.initInfinite(fun _ ->
let message = messages.Take()
message.Partition, message.Offset, message.Payload) // or AsyncSeq :)
At this point, we make an important observation: the native client will autocommit offsets acknowledging that a message in the buffer has been processed even though it may not have been dequeued. From RdKafka's point of view, the callback for that message has completed!
This is when you'll want to manage offsets yourself.
Manual Offsets
For a given partition, we need to know which messages have started processing. These
are the Active
messages that we do not yet want to commit. When a message has completed
we move it to Processed
.
It's possible that processing may complete out of order! To account for this, the Next
offset to commit must be less than the oldest Active
message.
type Offsets =
{ Next : Offset option // the next offset to be committed
Active : Offset list // offsets of active messages (started processing)
Processed : Offset list } // offsets of processed messages newer than (>) any
// active message, e.g.:
// still working on 4L, but 5L & 6L are processed
In the following module:
start
adds a message to theActive
setfinish
moves a message from theActive
set toProcessed
update
adjusts theNext
offset to commit (based on any changes above)
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Offsets =
let empty = { Next=None; Processed=[]; Active=[] }
let private update = function
| { Processed=[] } as x -> x
| { Active=[] } as x -> { x with Processed=[]; Next=List.max x.Processed |> Some }
| x -> x.Processed
|> List.partition ((>=) (List.min x.Active))
|> function | [], _ -> x
| c, p -> { x with Processed=p; Next=List.max c |> Some }
let start (x:Offset) (xs:Offsets) = update { xs with Active = x :: xs.Active }
let finish (x:Offset) (xs:Offsets) = update { xs with Active = List.filter ((<>) x) xs.Active
Processed = x :: xs.Processed }
Since the Offsets
above apply to an individual partition, we want to be able to track
all current partitions. This follows the same lifecycle we've seen so far:
- a partition is assigned to us
- a message starts processing
- a message finishes processing
- a partition may be revoked (and assigned to another instance)
type Partitions = Map<Partition, Offsets>
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Partitions =
let assign (p:Partition) (xs:Partitions) : Partitions =
match Map.tryFind p xs with
| Some _ -> xs
| None -> Map.add p Offsets.empty xs
let start (p:Partition, o:Offset) (xs:Partitions) : Partitions =
match Map.tryFind p xs with
| Some offsets -> Map.add p (Offsets.start o offsets) xs
| None -> xs
let finish (p:Partition, o:Offset) (xs:Partitions) : Partitions =
match Map.tryFind p xs with
| Some offsets -> Map.add p (Offsets.finish o offsets) xs
| None -> xs
let revoke (p:Partition) (xs:Partitions) : Partitions =
Map.remove p xs
Finally, we want the next offset to commit for each partition assigned to us. Our client can then commit a checkpoint to record completion of all messages up to this point.
let checkpoint : Partitions -> List<Partition*Offset> =
Map.toList >> List.choose (fun (p,o) -> o.Next |> Option.map(fun o -> p,1L+o))
Integrating & Committing Offsets
This example uses an F# mailbox processor to aggregate our progress from multiple threads. There are several ways to solve this problem! This particular solution can play nicely with asynchronous workflows and is fairly concise.
The actor observes the following:
- processing started or finished on a message at
Offset
ofPartition
- active
Partitions
have been assigned to us, or revoked from us (i.e. assigned to another consumer in our group)
type OffsetMonitor = OffsetMonitorMessage->unit
and OffsetMonitorMessage =
| Start of Partition * Offset | Finish of Partition * Offset // (1)
| Assign of Partition list | Revoke of Partition list // (2)
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module OffsetMonitor =
let assign m = List.map (fun (_,p,_) -> p) >> Assign >> m
let revoke m = List.map (fun (_,p,_) -> p) >> Revoke >> m
let start m (x:Message) = (x.Partition, x.Offset) |> Start |> m
let finish m (x:Message) = (x.Partition, x.Offset) |> Finish |> m
let create (consumer:EventConsumer) (topic:Topic) =
let integrate = function
| Start (p,o) -> Partitions.start (p,o)
| Finish (p,o) -> Partitions.finish (p,o)
| Assign (ps) -> List.foldBack Partitions.assign (ps)
| Revoke (ps) -> List.foldBack Partitions.revoke (ps)
Here, every 45 seconds our offset monitor will commit outstanding checkpoints. Note that a Kafka cluster may reassign your partitions if you wait too long to report progress.
MailboxProcessor<OffsetMonitorMessage>.Start(fun inbox ->
let rec loop(watch:Stopwatch, partitions:Partitions) = async {
if watch.Elapsed.TotalSeconds > 45. then
let! result =
Partitions.checkpoint partitions
|> List.map (fun (p,o) -> TopicPartitionOffset(topic,p,o))
|> Collections.Generic.List
|> consumer.Commit
|> Async.AwaitTask
|> Async.Catch
return! loop(Stopwatch.StartNew(),partitions)
else
let! message = inbox.TryReceive(1000)
return!
match message with
| None -> loop(watch, partitions)
| Some m -> loop(watch, partitions |> integrate m)
}
loop(Stopwatch.StartNew(), Map.empty)).Post
Subscribing (Parallelism + Manual Offset Management)
Let's suppose we want to process multiple messages at a time on multiple threads.
To tie all of this together, we create a hybrid of subscribeCallback
and subscribeSeq
functions above. It accepts an onMessage callback, tracks all offsets, and executes using
some degree of concurrency.
let subscribeParallel concurrency brokerCsv group topic onMessage =
- when a partition is assigned, we start tracking its offsets
- before business logic, offsets are marked as active
- after business logic, offsets are marked as processed
- when a partition is revoked, we stop tracking its offsets
let monitor = OffsetMonitor.create consumer topic
consumer.OnPartitionsAssigned.Add(
ofTopicPartitionOffsets >> OffsetMonitor.assign monitor) // (1)
consumer.OnPartitionsRevoked.Add(
ofTopicPartitionOffsets >> OffsetMonitor.revoke monitor) // (4)
let onMessage(message:Message) = async {
OffsetMonitor.start monitor message // (2)
do! onMessage(message)
OffsetMonitor.finish monitor message } // (3)
consumer.Subscribe(new Collections.Generic.List<string>([topic]))
consumer.Start()
Finally, take the messageSeq from subscribeSeq, and apply onMessage
with
the specified degree of concurrency. (using AsyncSeq, this time :) )
messageSeq
|> AsyncSeq.ofSeq
|> AsyncSeq.iterAsyncParThrottled concurrency onMessage
Supervision
Supervising progress of a microservice running RdKafka depends on monitoring two things:
- the range of messages available in a topic (i.e. its
Watermark
offsets), and - the current
Checkpoint
offsets for a consumer group relative to thoseWatermarks
Watermarks
A line drawn on the side of an empty ship is its low water line. When you put cargo on the ship, it sits lower in water. The line drawn on the side of a fully loaded ship is its high water line. The same terminology is used here:
type Watermark = // watermark high/low offset for each partition of a topic
{ Topic : string // topic
Partition : int // partition of the topic
High : int64 // highest offset available in this partition (newest message)
Low : int64 } // lowest offset available in this partition (oldest message)
To query these watermarks:
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Watermark =
let queryWithTimeout (timeout) (producer:Producer) (consumer:Consumer) (topic:Topic) =
let queryWatermark(t, p) =
async {
let! w =
consumer.QueryWatermarkOffsets(TopicPartition(t, p), timeout)
|> Async.AwaitTask
return { Topic=t; Partition=p; High=w.High; Low=w.Low }
}
async {
let topic = producer.Topic(topic)
let! metadata =
producer.Metadata(onlyForTopic=topic, timeout=timeout)
|> Async.AwaitTask
return!
metadata.Topics
|> Seq.collect(fun t -> [for p in t.Partitions -> t.Topic, p.PartitionId])
|> Seq.sort
|> Seq.map queryWatermark
|> Async.Parallel // again, consider AsyncSeq instead :)
}
Checkpoints
Your consumer group's current position is composed of an offset position within each partition of the topic:
type Checkpoint = List<Partition*Offset>
If you're monitoring within an active consumer, you have access to the absolute latest offsets completed for each partition. When multiple consumers are working together in a group, however, each has only a partial view of the overall progress.
It's possible to query the broker for the latest committed checkpoint for a consumer group across all partitions of the topic:
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Checkpoint =
let queryWithTimeout (timeout) (producer:Producer) (consumer:Consumer) (topic:Topic) =
async {
let! metadata =
producer.Metadata(onlyForTopic=producer.Topic(topic), timeout=timeout)
|> Async.AwaitTask
let partitions =
metadata.Topics
|> Seq.collect(fun t -> [for p in t.Partitions -> t.Topic, p.PartitionId])
|> Seq.sort
|> Seq.map (fun (t,p) -> new TopicPartition(t,p))
let! committed =
consumer.Committed(new Collections.Generic.List<_>(partitions), timeout)
|> Async.AwaitTask
let checkpoint : Checkpoint =
committed
|> Seq.map (fun tpo -> tpo.Partition, tpo.Offset)
|> Seq.sortBy fst
|> Seq.toList
return checkpoint
}
Over time, your current offset in each partition should increase as your consumer group processes messages. Similarly, the high watermark will also increase as new messages are added. The difference between your high watermark and your current position is your lag.
Using these figures, you can measure your performance relative to any service level agreement in effect for your microservice, and potentially take corrective action - such as scaling the number of consumer instances or size of machines.
Summary
With an eye to building top-tier microservices, we looked at:
- logging callbacks in RdKafka to achieve better visibility
- configuring the client for flexibility in several useful scenarios
-
publishing and subscribing, including:
- scaling the client as partitions are redistributed
- maintaining an at-least-once guarantee for message processing at scale
- monitoring overall progress using watermarks and committed checkpoints
- and we barely even scratched the surface ;)
If you enjoy working with F# and Kafka, I also encourage you to check out Kafunk - an open source client (written entirely in F# !) under development at Jet.
Thanks for visiting ~ have fun !
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Topic
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partition
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Offset
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.BrokerCsv
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ConsumerName
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ErrorReason
module Event
from Microsoft.FSharp.Control
--------------------
type Event =
| Statistics of string
| OffsetCommit of ErrorCode * List<Topic * Partition * Offset>
| EndReached of Topic * Partition * Offset
| PartitionsAssigned of List<Topic * Partition * Offset>
| PartitionsRevoked of List<Topic * Partition * Offset>
| ConsumerError of ErrorCode
| Error of ErrorCode * string
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Event
--------------------
type Event<'T> =
new : unit -> Event<'T>
member Trigger : arg:'T -> unit
member Publish : IEvent<'T>
Full name: Microsoft.FSharp.Control.Event<_>
--------------------
type Event<'Delegate,'Args (requires delegate and 'Delegate :> Delegate)> =
new : unit -> Event<'Delegate,'Args>
member Trigger : sender:obj * args:'Args -> unit
member Publish : IEvent<'Delegate,'Args>
Full name: Microsoft.FSharp.Control.Event<_,_>
--------------------
new : unit -> Event<'T>
--------------------
new : unit -> Event<'Delegate,'Args>
| _BEGIN = -200
| _BAD_MSG = -199
| _BAD_COMPRESSION = -198
| _DESTROY = -197
| _FAIL = -196
| _TRANSPORT = -195
| _CRIT_SYS_RESOURCE = -194
| _RESOLVE = -193
| _MSG_TIMED_OUT = -192
| _PARTITION_EOF = -191
...
Full name: RdKafka.ErrorCode
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofTopicPartitionOffset
type TopicPartitionOffset =
struct
new : topic:string * partition:int * offset:int64 -> TopicPartitionOffset
member Offset : int64 with get, set
member Partition : int with get, set
member ToString : unit -> string
member Topic : string with get, set
end
Full name: RdKafka.TopicPartitionOffset
--------------------
TopicPartitionOffset()
TopicPartitionOffset(topic: string, partition: int, offset: int64) : unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofTopicPartitionOffsets
from Microsoft.FSharp.Collections
Full name: Microsoft.FSharp.Collections.Seq.map
Full name: Microsoft.FSharp.Collections.Seq.toList
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofCommit
type Consumer =
inherit Handle
new : config:Config * ?brokerList:string -> Consumer
member Assign : partitions:List<TopicPartitionOffset> -> unit
member Assignment : List<TopicPartition>
member Commit : unit -> Task + 2 overloads
member Committed : partitions:List<TopicPartition> * timeout:TimeSpan -> Task<List<TopicPartitionOffset>>
member Consume : timeout:TimeSpan -> Nullable<MessageAndError>
member Dispose : unit -> unit
member GetWatermarkOffsets : topicPartition:TopicPartition -> Offsets
member Position : partitions:List<TopicPartition> -> List<TopicPartitionOffset>
member Subscribe : topics:List<string> -> unit
...
nested type OffsetCommitArgs
Full name: RdKafka.Consumer
--------------------
Consumer(config: Config, ?brokerList: string) : unit
struct
member Error : ErrorCode with get, set
member Offsets : IList<TopicPartitionOffset> with get, set
end
Full name: RdKafka.Consumer.OffsetCommitArgs
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.ofError
type Handle =
new : unit -> Handle
member Dispose : unit -> unit
member ListGroup : group:string * timeout:TimeSpan -> Task<GroupInfo>
member ListGroups : timeout:TimeSpan -> Task<List<GroupInfo>>
member LogLevel : -> int with set
member MemberId : string
member Metadata : ?allTopics:bool * ?onlyForTopic:Topic * ?includeInternal:bool * ?timeout:TimeSpan -> Task<Metadata>
member Name : string
member OutQueueLength : int64
member QueryWatermarkOffsets : topicPartition:TopicPartition * ?timeout:TimeSpan -> Task<Offsets>
...
nested type ErrorArgs
Full name: RdKafka.Handle
--------------------
Handle() : unit
struct
member ErrorCode : ErrorCode with get, set
member Reason : string with get, set
end
Full name: RdKafka.Handle.ErrorArgs
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.toLog
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.fromConsumerToLog
type EventConsumer =
inherit Consumer
new : config:Config * ?brokerList:string -> EventConsumer
member Dispose : unit -> unit
member Start : unit -> unit
member Stop : unit -> Task
event OnMessage : EventHandler<Message>
event OnConsumerError : EventHandler<ErrorCode>
event OnEndReached : EventHandler<TopicPartitionOffset>
Full name: RdKafka.EventConsumer
--------------------
EventConsumer(config: Config, ?brokerList: string) : unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.fromProducerToLog
type Producer =
inherit Handle
new : brokerList:string -> Producer + 1 overload
member Topic : topic:string * ?config:TopicConfig -> Topic
Full name: RdKafka.Producer
--------------------
Producer(brokerList: string) : unit
Producer(config: Config, ?brokerList: string) : unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.connect
Full name: Microsoft.FSharp.Core.bool
type Config =
new : unit -> Config
member DefaultTopicConfig : TopicConfig with get, set
member Dump : unit -> Dictionary<string, string>
member EnableAutoCommit : bool with get, set
member GroupId : string with get, set
member Item : string -> string with get, set
member Logger : LogCallback with get, set
member StatisticsInterval : TimeSpan with get, set
nested type LogCallback
Full name: RdKafka.Config
--------------------
Config() : unit
type TimeSpan =
struct
new : ticks:int64 -> TimeSpan + 3 overloads
member Add : ts:TimeSpan -> TimeSpan
member CompareTo : value:obj -> int + 1 overload
member Days : int
member Duration : unit -> TimeSpan
member Equals : value:obj -> bool + 1 overload
member GetHashCode : unit -> int
member Hours : int
member Milliseconds : int
member Minutes : int
...
end
Full name: System.TimeSpan
--------------------
TimeSpan()
TimeSpan(ticks: int64) : unit
TimeSpan(hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : unit
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : unit
type TopicConfig =
new : unit -> TopicConfig
member CustomPartitioner : Partitioner with get, set
member Dump : unit -> Dictionary<string, string>
member Item : string -> string with get, set
nested type Partitioner
Full name: RdKafka.TopicConfig
--------------------
TopicConfig() : unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.publish
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = Byte
Full name: Microsoft.FSharp.Core.byte
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member Async.AwaitTask : task:Threading.Tasks.Task<'T> -> Async<'T>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.subscribeCallback
Full name: Microsoft.FSharp.Collections.List.map
namespace System.Collections
--------------------
namespace Microsoft.FSharp.Collections
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
Full name: System.Collections.Generic.List<_>
--------------------
Collections.Generic.List() : unit
Collections.Generic.List(capacity: int) : unit
Collections.Generic.List(collection: Collections.Generic.IEnumerable<'T>) : unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.subscribeSeq
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
type BlockingCollection<'T> =
new : unit -> BlockingCollection<'T> + 3 overloads
member Add : item:'T -> unit + 1 overload
member BoundedCapacity : int
member CompleteAdding : unit -> unit
member CopyTo : array:'T[] * index:int -> unit
member Count : int
member Dispose : unit -> unit
member GetConsumingEnumerable : unit -> IEnumerable<'T> + 1 overload
member IsAddingCompleted : bool
member IsCompleted : bool
...
Full name: System.Collections.Concurrent.BlockingCollection<_>
--------------------
BlockingCollection() : unit
BlockingCollection(boundedCapacity: int) : unit
BlockingCollection(collection: IProducerConsumerCollection<'T>) : unit
BlockingCollection(collection: IProducerConsumerCollection<'T>, boundedCapacity: int) : unit
struct
member Key : byte[] with get, set
member Offset : int64 with get, set
member Partition : int with get, set
member Payload : byte[] with get, set
member Topic : string with get, set
member TopicPartitionOffset : TopicPartitionOffset
end
Full name: RdKafka.Message
type ConcurrentQueue<'T> =
new : unit -> ConcurrentQueue<'T> + 1 overload
member CopyTo : array:'T[] * index:int -> unit
member Count : int
member Enqueue : item:'T -> unit
member GetEnumerator : unit -> IEnumerator<'T>
member IsEmpty : bool
member ToArray : unit -> 'T[]
member TryDequeue : result:'T -> bool
member TryPeek : result:'T -> bool
Full name: System.Collections.Concurrent.ConcurrentQueue<_>
--------------------
ConcurrentQueue() : unit
ConcurrentQueue(collection: Collections.Generic.IEnumerable<'T>) : unit
BlockingCollection.Add(item: Message, cancellationToken: Threading.CancellationToken) : unit
Full name: Microsoft.FSharp.Collections.Seq.initInfinite
BlockingCollection.Take(cancellationToken: Threading.CancellationToken) : Message
{Next: Offset option;
Active: Offset list;
Processed: Offset list;}
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Offsets
Full name: Microsoft.FSharp.Core.option<_>
Full name: Microsoft.FSharp.Collections.list<_>
type CompilationRepresentationAttribute =
inherit Attribute
new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
member Flags : CompilationRepresentationFlags
Full name: Microsoft.FSharp.Core.CompilationRepresentationAttribute
--------------------
new : flags:CompilationRepresentationFlags -> CompilationRepresentationAttribute
| None = 0
| Static = 1
| Instance = 2
| ModuleSuffix = 4
| UseNullAsTrueValue = 8
| Event = 16
Full name: Microsoft.FSharp.Core.CompilationRepresentationFlags
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.empty
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.update
Full name: Microsoft.FSharp.Collections.List.max
Full name: Microsoft.FSharp.Collections.List.partition
Full name: Microsoft.FSharp.Collections.List.min
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.start
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetsModule.finish
Full name: Microsoft.FSharp.Collections.List.filter
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partitions
module Map
from Microsoft.FSharp.Collections
--------------------
type Map<'Key,'Value (requires comparison)> =
interface IEnumerable
interface IComparable
interface IEnumerable<KeyValuePair<'Key,'Value>>
interface ICollection<KeyValuePair<'Key,'Value>>
interface IDictionary<'Key,'Value>
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
member Add : key:'Key * value:'Value -> Map<'Key,'Value>
member ContainsKey : key:'Key -> bool
override Equals : obj -> bool
member Remove : key:'Key -> Map<'Key,'Value>
...
Full name: Microsoft.FSharp.Collections.Map<_,_>
--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.assign
Full name: Microsoft.FSharp.Collections.Map.tryFind
Full name: Microsoft.FSharp.Collections.Map.add
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.start
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.finish
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.revoke
Full name: Microsoft.FSharp.Collections.Map.remove
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.PartitionsModule.checkpoint
Full name: Microsoft.FSharp.Collections.Map.toList
Full name: Microsoft.FSharp.Collections.List.choose
from Microsoft.FSharp.Core
Full name: Microsoft.FSharp.Core.Option.map
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitor
| Start of Partition * Offset
| Finish of Partition * Offset
| Assign of Partition list
| Revoke of Partition list
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorMessage
Full name: Microsoft.FSharp.Core.unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.assign
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.revoke
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.start
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.finish
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitorModule.create
module Partitions
from 2016-12-08-rdkafka-for-fsharp-microservices
--------------------
type Partitions = Map<Partition,Offsets>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partitions
Full name: Microsoft.FSharp.Collections.List.foldBack
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
type Stopwatch =
new : unit -> Stopwatch
member Elapsed : TimeSpan
member ElapsedMilliseconds : int64
member ElapsedTicks : int64
member IsRunning : bool
member Reset : unit -> unit
member Restart : unit -> unit
member Start : unit -> unit
member Stop : unit -> unit
static val Frequency : int64
...
Full name: System.Diagnostics.Stopwatch
--------------------
Stopwatch() : unit
Consumer.Commit(offsets: Collections.Generic.List<TopicPartitionOffset>) : Threading.Tasks.Task
Consumer.Commit(message: Message) : Threading.Tasks.Task
Full name: Microsoft.FSharp.Collections.Map.empty
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.AsyncSeq.ofSeq
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.AsyncSeq.iterAsyncParThrottled
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.subscribeParallel
module OffsetMonitor
from 2016-12-08-rdkafka-for-fsharp-microservices
--------------------
type OffsetMonitor = OffsetMonitorMessage -> unit
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.OffsetMonitor
from 2016-12-08-rdkafka-for-fsharp-microservices
{Topic: string;
Partition: int;
High: int64;
Low: int64;}
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Watermark
Watermark.Topic: string
--------------------
type Topic = string
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Topic
Watermark.Partition: int
--------------------
type Partition = int
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Partition
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.WatermarkModule.queryWithTimeout
type TopicPartition =
struct
new : topic:string * partition:int -> TopicPartition
member Partition : int with get, set
member ToString : unit -> string
member Topic : string with get, set
end
Full name: RdKafka.TopicPartition
--------------------
TopicPartition()
TopicPartition(topic: string, partition: int) : unit
Full name: Microsoft.FSharp.Collections.Seq.collect
Full name: Microsoft.FSharp.Collections.Seq.sort
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Checkpoint
module Checkpoint
from 2016-12-08-rdkafka-for-fsharp-microservices
--------------------
type Checkpoint = List<Partition * Offset>
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.Checkpoint
Full name: 2016-12-08-rdkafka-for-fsharp-microservices.CheckpointModule.queryWithTimeout
Full name: Microsoft.FSharp.Collections.Seq.sortBy
Full name: Microsoft.FSharp.Core.Operators.fst