Korzystanie z jednej listy przy uruchomionych kilku wątkach.

0

Witam,
W wielkich bólach stworzyłem program, który ma za zadanie pobierać dane z internetu z wykorzystaniem listy serwerów Proxy.
Muszę wykorzystać proxy ponieważ strona z której pobieram posiada limit dziennych zapytań. Ponadto nie każde proxy na liście jest aktywne.
Moduł pobierający ma w sobie zapisane sprawdzanie połączenia i w razie nie powodzenia zmianę serwera proxy na następny z listy.
Niestety mam problem ze zmianą serwera proxy na inny w momencie, gdy pracują np. 3 wątki pobierania i każdy z nich chce zmienić serwer proxy na następny.
Nie wiem jak zablokować zmianę i zaczekanie, aż jeden z wątków, który pierwszy odkrył problem, skończył zmianę na dobry proxy.

Dzięki za każdą sugestię.

3

Bezpieczna wątkowo kolejka System.Collections.Concurrent.ConcurrentQueue<T> nie rozwiązuje twojego problemu?

0
kzkzg napisał(a):

Bezpieczna wątkowo kolejka System.Collections.Concurrent.ConcurrentQueue<T> nie rozwiązuje twojego problemu?

Dzięki, zobaczę z czym to się je.
Tymczasem spróbowałem też ustawianie zmiennej True, gdy musi nastąpić zmiana Proxy i na początku metody pobierającej pętla powstrzymująca dalsze działanie, gdy nadal ta zmienna jest na True. Brzmi pewnie strasznie :-)

0

Brzmi pewnie strasznie :-)

Nie strasznie ale mało bezpiecznie. Nie zabezpieczy to przed poprzednim problemem tylko sprawi że będzie pojawiał się dużo rzadziej. Zgodnie z prawem Murphy'ego objawi się w najmniej odpowiednim momencie.

0
Delor napisał(a):

Brzmi pewnie strasznie :-)

Nie strasznie ale mało bezpiecznie. Nie zabezpieczy to przed poprzednim problemem tylko sprawi że będzie pojawiał się dużo rzadziej. Zgodnie z prawem Murphy'ego objawi się w najmniej odpowiednim momencie.

I tak też się dzieje :(
Nie będzie to łatwa przeprawa, nawet z ConcurrentQueue.
Chyba będą musiał zmienić całą koncepcję pod wielowątkowy, bo szeregowo działa bez problemu tylko wolno.

1

opakuj fragment kodu odpowiedzialny za zamianę proxy w sekcje krytyczną za pomocą lock:
https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement
tylko jeden wątek w danym czasie będzie mógł skorzystać z opakowanego kodu

0
neves napisał(a):

opakuj fragment kodu odpowiedzialny za zamianę proxy w sekcje krytyczną za pomocą lock:
https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement
tylko jeden wątek w danym czasie będzie mógł skorzystać z opakowanego kodu

A co w tym czasie robią pozostałe wątki?

0
neves napisał(a):

opakuj fragment kodu odpowiedzialny za zamianę proxy w sekcje krytyczną za pomocą lock:
https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement
tylko jeden wątek w danym czasie będzie mógł skorzystać z opakowanego kodu

Wszystko byłoby ok do momentu, aż użyję sposobu na wykrywanie nieaktywnych proxy.
Timeout występuję bardzo często. Przy korzystaniu ze zwykłego łącza oraz tego sposobu także występuje Timeout, a na pewno nie powoduje to serwis z którego pobieram.
Czy możliwe, że przyczyną jest wielowątkowość oraz niedostosowana klasa AsyncExtensions?


...
    response = (HttpWebResponse)request.GetResponseAsync().WithTimeout(TimeSpan.FromSeconds(1)).Result;
....

    public static class AsyncExtensions
    {
        public static Task<T> WithTimeout<T>(this Task<T> task, TimeSpan timeout)
        {
             return Task.Factory.StartNew(() =>
            {
                try
                {
                    var b = task.Wait((int)timeout.TotalMilliseconds);
                    if (b)
                    {
                        return task.Result;
                    }
                }
                catch (Exception esx)
                {
                    throw new WebException("Connect failure", WebExceptionStatus.ConnectFailure);
                }
                throw new WebException("Timed out", WebExceptionStatus.Timeout);
            });
        }
    }
0
duzers napisał(a):
neves napisał(a):

opakuj fragment kodu odpowiedzialny za zamianę proxy w sekcje krytyczną za pomocą lock:
https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement
tylko jeden wątek w danym czasie będzie mógł skorzystać z opakowanego kodu

A co w tym czasie robią pozostałe wątki?

Wątek czeka, ale i tak później jak się zwolni to wchodzi w sekcję krytyczną i dokonuje zmiany proxy. Jak to zablokować, gdy poprzedni wątek już to zrobił?
A najśmieszniejsze jest to, że jak skompiluje program jako Debug to program śmiga raczej bez problemów.

0
duzers napisał(a):

A najśmieszniejsze jest to, że jak skompiluje program jako Debug to program śmiga raczej bez problemów.

Tak, właśnie, śmiga w debugu. ;)

0

Aby zobrazować w czym problem wrzucam kod realizujący symulację połączenia.
Program próbuje pobrać informacje "one", "two", "three"..... i ma do dyspozycji proxy "1", "2", "3"
Wprowadziłem symulację błędu na serwerze proxy "1".
I teraz chodzi o to, żeby przy błędzie, zmiana serwera na następny wykonana została jednokrotnie.
Tak się nie dzieje, ponieważ każdy z uruchomionych wątków pobierania (maks 2) zmienia serwer proxy i tak naprawdę pomijany jest serwer "2".

Proszę kolegów o pomoc.

  class Program
    {
        private static Timer timer;                 // timer to initiate periodic data refresh and start date download on background threads
        private static Queue<string> refreshQueue;  // tickers waiting to be refreshed
        private static bool inTimer;
        private static long concurrentDownloads;    // number of concurrent data downloads running (WinINet max it in 2!)
        private static ConcurrentQueue<string> proxyQueue;
        private static string proxy;

        static void Main(string[] args)
        {
            timer = new Timer(timer_tick, null, Timeout.Infinite, Timeout.Infinite);

            refreshQueue = new Queue<string>();
            // Inserting the elements into the Queue 
            refreshQueue.Enqueue("one");
            refreshQueue.Enqueue("two");
            refreshQueue.Enqueue("three");
            refreshQueue.Enqueue("four");
            refreshQueue.Enqueue("five");
            refreshQueue.Enqueue("six");

            proxyQueue = new ConcurrentQueue<string>();
            proxyQueue.Enqueue("1");
            proxyQueue.Enqueue("2");
            proxyQueue.Enqueue("3");

            // start timer
            timer.Change(1000, 100);

            Console.ReadLine();
        }
        /// <summary>
        /// Timer event handler
        /// Start background threads to refresh enqueued tickers. It checks if there is anything in the refreshQueue and less then 3 requests are currently executing, then it starts up a threadpool thread to execute the next refresh.
        /// </summary>
        /// <param name="sender"></param>
        private static void timer_tick(object sender)
        {
            if (inTimer)
                return;

            inTimer = true;

            try
            {
                // if there are tickers enqueued and there are less then 3 parallel downloads already running
                if (refreshQueue.Count > 0 && Interlocked.Read(ref concurrentDownloads) < 2)
                {
                    // increment no of downloads
                    Interlocked.Increment(ref concurrentDownloads);

                    // dequeue the ticker and get the tickerdata for it
                    string ticker = refreshQueue.Dequeue();

                    // make an idle threadpool thread execute the download in the background
                    ThreadPool.QueueUserWorkItem(RefreshTicker, ticker);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed to process all tickers: " + ex);
            }
            finally
            {
                inTimer = false;
            }
        }

        /// <summary>
        /// Refresh data of a single ticker
        /// </summary>
        /// <param name="ticker"></param>
        private static void RefreshTicker(object ticker)
        {
            try
            {
                int response = GetWebData(0, ticker);

                // if valid response
                if (response != 0)
                {
                    Console.WriteLine("Obróbka otrzymanych danych: " + ticker);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed RefreshTicker: " + ticker + "\n" + ex);
            }
            finally
            {
                // decrement no of downloads
                Interlocked.Decrement(ref concurrentDownloads);
            }
        }//koniec RefreshTicker

        private static int GetWebData(int retry, object ticker)
        {
            string threadid = $"thred {Thread.CurrentThread.ManagedThreadId}: ";
            try
            {
                Console.WriteLine(threadid +" Refresh ticker: " + ticker);


                //zmiana serwera na następny
                if (retry >= 1)
                {
                    Console.WriteLine(threadid +" "+ticker + " Usuwam zły proxy" + "\n");
                    proxyQueue.TryDequeue(out proxy);
                    retry = 0;
                }
                else
                {
                    proxyQueue.TryPeek(out proxy);

                    Console.WriteLine(threadid +" "+ticker + " Proxy "+proxy+"\n");
                }



                if (proxy == "1")
                {
                    Thread.Sleep(3000);
                    throw new Exception();
                }
                else
                    return 1;
            }
            catch (Exception ex)
            {
                Console.WriteLine(threadid +" Failed to process " + ticker);

                return GetWebData(retry + 1, ticker);
            }

            return 0;
        }//koniec GetWebData

    }//koniec klasy Program

1 użytkowników online, w tym zalogowanych: 0, gości: 1