Yesterday I noticed a strange anomaly in the logs of an application I wrote and manage at work while investigating another issue. It manifested as us sending duplicate messages from a queue to a third party over and over.
I looked into the code and since threading was involved, I figured there was some thread safety/shared state issue involved. I consulted with a couple of coworkers who didn’t immediately notice any issues, but I created some simple test cases to test my assumptions about threading with lambdas.
Simple Example of the problem
Below you will see me attempting to create 5 workers, do a small unit of work, and then write to the console. I would expect each worker to write out the Int32
it was created with. My assumption was that the C# compiler would, with the lambda expression, create a copy of i
for the thread being created and use that copy. I was entirely wrong.
As you can see, when the worker is created, on the initial thread, each Console.WriteLine
has the right value of i
, but when the thread is running, it contains the last value of i
, 6 (for loop increments it after its last value value causing the loop to exit).
var rand = new Random(); var threads = new List<Thread>(); for (int i = 1; i <= 5; i++) { Console.WriteLine("Creating worker {0}.", i); Thread t = new Thread(() => { // Simulate work Thread.Sleep(rand.Next(500, 2000)); Console.WriteLine("Finished running worker {0}.", i); }); threads.Add(t); } threads.ForEach(t => t.Start()); threads.ForEach(t => t.Join()); /* Output: Creating worker 1. Creating worker 2. Creating worker 3. Creating worker 4. Creating worker 5. Finished running worker 6. Finished running worker 6. Finished running worker 6. Finished running worker 6. Finished running worker 6. */
Simple Fix
The C# compiler did not make a copy of the state, but we can do this directly and pass it in using ParameterizedThreadStart
. This makes the list a collection of Int32
/Thread
pairs. Obviously, in our actual app, our state object is more complex than an Int32
.
var rand = new Random(); var threads = new List<Tuple<int, Thread>>(); for (int i = 1; i <= 5; i++) { Console.WriteLine("Creating worker {0}.", i); Thread t = new Thread(x => { // Simulate work Thread.Sleep(rand.Next(500, 2000)); Console.WriteLine("Finished running worker {0}.", x); }); threads.Add(Tuple.Create(i, t)); } threads.ForEach(tuple => tuple.Item2.Start(tuple.Item1)); threads.ForEach(tuple => tuple.Item2.Join()); /* Output: Creating worker 1. Creating worker 2. Creating worker 3. Creating worker 4. Creating worker 5. Finished running worker 1. Finished running worker 4. Finished running worker 2. Finished running worker 3. Finished running worker 5. */
Better Fix
That works, but it doesn’t really reflect the original intent. I receive X number of things to do, and in the real product, a Semaphore
was used to control the maximum number of messages that were sent at a time.
For instance, if I received 200 messages from the queue to send and I can send 50 messages at a time, I would spin up 200 threads which would wait on a semaphore, sending 50 maximum at a time. Obviously this is inefficient, and I don’t really have an excuse for why I did it this way when I converted it from a single-threaded process that could not keep up with demand to a multi-threaded process which ended up with this duplication problem. In retrospect, I would never have done this.
The following has a queue of 15 work items which is serviced by 5 worker threads and represents close to how the code works now.
Queue With Workers Serving It
var rand = new Random(); var threads = new List<Thread>(); var queueLocker = new object(); var queue = new Queue<int>(); const short maxWorkers = 5; // Create dummy data for processing for (int job = 1; job <= 15; job++) { queue.Enqueue(job); } for (int i = 1; i <= maxWorkers; i++) { Console.WriteLine("Creating worker {0}.", i); Thread t = new Thread(() => { int? job = null; // Try to get job from queue to handle while (queue.Count > 0) { lock (queueLocker) { if (queue.Count > 0) { job = queue.Dequeue(); } } if (job.HasValue) { // Simulate work Thread.Sleep(rand.Next(500, 2000)); Console.WriteLine( "Worker {0} finished running job {1}.", Thread.CurrentThread.Name, job); } } Console.WriteLine( "Worker {0} has no more work. Exiting.", Thread.CurrentThread.Name); }); t.Name = i.ToString(); threads.Add(t); } threads.ForEach(t => t.Start()); threads.ForEach(t => t.Join()); /* Output: Creating worker 1. Creating worker 2. Creating worker 3. Creating worker 4. Creating worker 5. Worker 4 finished running job 4. Worker 5 finished running job 5. Worker 1 finished running job 1. Worker 2 finished running job 2. Worker 3 finished running job 3. Worker 1 finished running job 8. Worker 2 finished running job 9. Worker 4 finished running job 6. Worker 5 finished running job 7. Worker 3 finished running job 10. Worker 1 finished running job 11. Worker 1 has no more work. Exiting. Worker 2 finished running job 12. Worker 2 has no more work. Exiting. Worker 4 finished running job 13. Worker 4 has no more work. Exiting. Worker 5 finished running job 14. Worker 5 has no more work. Exiting. Worker 3 finished running job 15. Worker 3 has no more work. Exiting. */
Even Better Fix I Can’t Use
Unfortunately, this code is limited to .Net 3.5 right now, but this particular problem looks like a great match for the Task Parallel Library in .Net >= 4.0. It would offload all the thread handling to .Net which is particularly well-suited to this problem: running tasks in parallel.
Comments