1 //==========================================================================
3 // tests/multi_lo_test.c
5 // Multiple selects-at-one-time test, using lo for portability.
7 //==========================================================================
8 //####BSDCOPYRIGHTBEGIN####
10 // -------------------------------------------
12 // Portions of this software may have been derived from OpenBSD or other sources
13 // and are covered by the appropriate copyright disclaimers included herein.
15 // -------------------------------------------
17 //####BSDCOPYRIGHTEND####
18 //==========================================================================
19 //#####DESCRIPTIONBEGIN####
21 // Author(s): sorin@netappi.com, hmt
22 // Contributors: gthomas,sorin@netappi.com
26 // This test is to test that the internal producer operation to select
27 // truly has broadcast semantics; there was a bug in there whereby it
28 // doesn't, so events get lost and/or the wrong thread is awakened.
30 // We need to create N threads selecting on different sockets
31 // (different ports) (including one or two selecting on several
32 // sockets) and have a further thread or threads which send data to
33 // the those sockets in an order "random" with respect to the order in
34 // which the N selectors entered the wait, and their thread
37 // If this all works, then we know that select always wakes the right
38 // thread in the right order. I think...
40 // I think 10 threads 0-9 where #2,#3,#6,#7 wait for multiple threads
41 // will do it. #0-4 will be prio HI, #5-9 will be prio LO. Sender
42 // thread A at prio MID will send to sockets 1,3,5,7,9. Sender thread
43 // B at prio LOWEST will send to sockets 0,2,4,6,8.
45 // Each sender thread will wait for a different semaphore signal
46 // before doing their next send, thus confirming correct ordering.
47 // Two common semaphores will also be signalled, one when a send
48 // occurs, the other when a recv happens.
50 // The master thread will start off VERYHIGHPRI, then drop after
51 // starting all the others, to VERYLOW... when it next runs, those
52 // common semaphores should both have value 10 == NLISTENERS.
55 //#####DESCRIPTIONEND#####
57 //==========================================================================
61 #include <cyg/infra/testcase.h>
63 #ifndef CYGPKG_LIBC_STDIO
64 #define perror(s) diag_printf(#s ": %s\n", strerror(errno))
67 #define SOURCE_PORT1 9900
68 #define SOURCE_PORT2 9800 // for those who listen to multiple ports
69 #define SOURCE_PORT3 9700 // for the dummy consumers of events
71 #define PRIO_DUMMY 4 // Really high, so they're always right back there
73 #define PRIO_LISTENER_HI 10
74 #define PRIO_LISTENER_LO 15
76 #define PRIO_SENDER_MID 12
77 #define PRIO_SENDER_LOW 17
79 #define PRIO_MASTERHIGH 3
80 #define PRIO_MASTERLOW 25
82 #ifndef CYGPKG_IO_FILEIO
83 #if CYGPKG_IO_NFILE > 30
86 // fewer threads if not many sockets available
87 #define NLISTENERS (CYGPKG_IO_NFILE/3)
90 #include <pkgconf/io_fileio.h>
91 #if CYGNUM_FILEIO_NFD > 30
94 // fewer threads if not many sockets available
95 #define NLISTENERS (CYGNUM_FILEIO_NFD/3)
103 #define NUM_BUF NLISTENERS
106 // buffers for receiving into:
107 static unsigned char data_buf1[NUM_BUF][MAX_BUF];
109 static unsigned char data_buf_write1[]="Client is alive";
111 #define STACK_SIZE (CYGNUM_HAL_STACK_SIZE_TYPICAL)
112 #define MASTER_STACK_SIZE (CYGNUM_HAL_STACK_SIZE_TYPICAL + 0x1000)
114 static char stack_master[MASTER_STACK_SIZE];
115 static cyg_thread master_thread_data;
116 static cyg_handle_t master_thread_handle;
118 static char stack_dummy[NDUMMIES][STACK_SIZE];
119 static cyg_thread dummy_thread_data[NDUMMIES];
120 static cyg_handle_t dummy_thread_handle[NDUMMIES];
122 static char stack_listener[NLISTENERS][STACK_SIZE];
123 static cyg_thread listener_thread_data[NLISTENERS];
124 static cyg_handle_t listener_thread_handle[NLISTENERS];
126 static char stack_sender[NSENDERS][STACK_SIZE];
127 static cyg_thread sender_thread_data[NSENDERS];
128 static cyg_handle_t sender_thread_handle[NSENDERS];
130 static cyg_sem_t listen_sema[NLISTENERS];
132 static cyg_sem_t send_sema;
133 static cyg_sem_t recv_sema;
135 static cyg_thread_entry_t master;
136 static cyg_thread_entry_t listener;
137 static cyg_thread_entry_t sender;
139 // ------------------------------------------------------------------------
144 CYG_TEST_FAIL_FINISH( s );
149 #define max(a,b) (((a) > (b)) ? (a) : (b))
152 // ------------------------------------------------------------------------
154 void dummy( cyg_addrword_t which )
156 // Share the same socket... we appear to run out otherwise.
157 static int s_s1 = -1;
158 static struct sockaddr_in local;
164 CYG_TEST_CHECK( 0 <= which, "which under" );
165 CYG_TEST_CHECK( NDUMMIES > which, "which over" );
167 diag_printf( "Dummy %d alive\n", which );
170 s_s1 = socket(AF_INET, SOCK_STREAM, 0);
172 pexit("stream socket 1");
174 memset(&local, 0, sizeof(local));
175 local.sin_family = AF_INET;
176 local.sin_len = sizeof(local);
177 local.sin_port = ntohs(SOURCE_PORT3 + which);
178 local.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
179 if(bind(s_s1, (struct sockaddr *) &local, sizeof(local)) < 0) {
180 pexit("dummy bind /source_1/ error");
182 listen(s_s1, SOMAXCONN);
187 FD_SET(s_s1, &in_fds);
188 num = select( s_s1+1, &in_fds,0,0,0);
190 if (FD_ISSET(s_s1,&in_fds)) {
191 CYG_TEST_FAIL( "Activity on dummy port!" );
196 // ------------------------------------------------------------------------
198 void listener( cyg_addrword_t which )
200 int s_s1 = -1, e_s1 = 0, s_s2 = -1, e_s2 = 0;
201 struct sockaddr_in e_s1_addr,e_s2_addr,local;
206 // do we select on multiple sources?
207 int dual = (3 == (which & 3)) || (2 == (which & 3));
208 // then which is 2,3,6,7 so set up a 2nd listener
210 CYG_TEST_CHECK( 0 <= which, "which under" );
211 CYG_TEST_CHECK( NLISTENERS > which, "which over" );
213 diag_printf( "Listener %d alive [%s]\n", which, dual ? "dual" : "single" );
215 s_s1 = socket(AF_INET, SOCK_STREAM, 0);
217 pexit("stream socket 1");
219 memset(&local, 0, sizeof(local));
220 local.sin_family = AF_INET;
221 local.sin_len = sizeof(local);
222 local.sin_port = ntohs(SOURCE_PORT1 + which);
223 local.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
224 if(bind(s_s1, (struct sockaddr *) &local, sizeof(local)) < 0) {
225 pexit("bind /source_1/ error");
227 listen(s_s1, SOMAXCONN);
230 s_s2 = socket(AF_INET, SOCK_STREAM, 0);
232 pexit("stream socket 2");
234 memset(&local, 0, sizeof(local));
235 local.sin_family = AF_INET;
236 local.sin_len = sizeof(local);
237 local.sin_port = ntohs(SOURCE_PORT2 + which);
238 local.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
239 if(bind(s_s2, (struct sockaddr *) &local, sizeof(local)) < 0) {
240 pexit("bind /source_2/ error");
242 listen(s_s2, SOMAXCONN);
247 FD_SET(s_s1, &in_fds);
249 FD_SET(s_s2, &in_fds);
250 num = select ( max(s_s1,s_s2)+1, &in_fds,0,0,0);
252 if (FD_ISSET(s_s1,&in_fds)) {
253 len = sizeof(e_s1_addr);
254 if ((e_s1 = accept(s_s1,(struct sockaddr *)&e_s1_addr,&len))<0) {
255 pexit("accept /source_1/");
257 diag_printf("TCP SERVER connection from %s: %d\n",
258 inet_ntoa(e_s1_addr.sin_addr),ntohs(e_s1_addr.sin_port));
262 if (FD_ISSET(s_s2,&in_fds)) {
263 len = sizeof(e_s2_addr);
264 if ((e_s2 = accept(s_s2,(struct sockaddr *)&e_s2_addr,&len))<0) {
265 pexit("accept /source_2/");
267 diag_printf("TCP SERVER connection from %s: %d\n",
268 inet_ntoa(e_s2_addr.sin_addr), ntohs(e_s2_addr.sin_port));
271 if ((e_s1 != 0) || ( e_s2 != 0)) {
276 CYG_TEST_CHECK( 0 != e_s1, "No connection made on s1!" );
278 if ((len = read(e_s1, data_buf1[which], MAX_BUF)) < 0 ) {
279 perror("I/O error s1");
280 CYG_TEST_FAIL( "Read s1 failed" );
282 diag_printf("Listener %d: %s\n", which, data_buf1[which]);
292 cyg_semaphore_post( &listen_sema[which] ); // Verify that I was here
293 cyg_semaphore_post( &recv_sema ); // Count receptions
295 cyg_thread_exit(); // explicitly
298 // ------------------------------------------------------------------------
299 static void sender( cyg_addrword_t which ) // which means which set (odd/even) here...
302 struct sockaddr_in local;
305 diag_printf("client %d [%s] :started\n", which, (which & 1) ? "odd" : "even" );
307 for ( /* which as is */; which < NLISTENERS; which += 2 ) {
309 s_source = socket(AF_INET, SOCK_STREAM, 0);
311 pexit("stream socket");
313 memset(&local, 0, sizeof(local));
314 local.sin_family = AF_INET;
315 local.sin_len = sizeof(local);
316 local.sin_port = htons( SOURCE_PORT1 + which );
317 local.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
319 if (connect(s_source, (struct sockaddr *)&local, sizeof(local)) < 0) {
320 pexit("Can't connect to target");
323 if ((len = write(s_source,data_buf_write1,sizeof(data_buf_write1) )) < 0) {
324 CYG_TEST_FAIL_FINISH("Error writing buffer");
326 cyg_semaphore_wait( &listen_sema[which] ); // wait for the appropriate semaphore "reply"
327 cyg_semaphore_post( &send_sema ); // count up successful sends
331 cyg_thread_exit(); // explicitly
336 master(cyg_addrword_t param)
339 cyg_handle_t self = cyg_thread_self();
341 cyg_semaphore_init( &send_sema, 0 );
342 cyg_semaphore_init( &recv_sema, 0 );
344 for ( i = 0 ; i < NLISTENERS; i++ )
345 cyg_semaphore_init( &listen_sema[i], 0 );
347 init_all_network_interfaces();
348 CYG_TEST_INFO("Start multiple loopback select test");
350 // We are currently running at high prio, so we can just go and make
353 // Some at higher prio
354 for ( i = 0; i < NLISTENERS/2; i++ )
355 cyg_thread_create(PRIO_LISTENER_HI, // Priority
357 i, // entry parameter
359 &stack_listener[i][0], // Stack
361 &listener_thread_handle[i], // Handle
362 &listener_thread_data[i] // Thread data structure
364 // the rest at lower prio
365 for ( ; i < NLISTENERS ; i++ )
366 cyg_thread_create(PRIO_LISTENER_LO, // Priority
368 i, // entry parameter
370 &stack_listener[i][0], // Stack
372 &listener_thread_handle[i], // Handle
373 &listener_thread_data[i] // Thread data structure
376 // make the dummy event-grabber threads
377 for ( i = 0; i < NDUMMIES; i++ )
378 cyg_thread_create(PRIO_DUMMY, // Priority
380 i, // entry parameter
382 &stack_dummy[i][0], // Stack
384 &dummy_thread_handle[i], // Handle
385 &dummy_thread_data[i] // Thread data structure
388 // Start those threads
389 for ( i = 0; i < NLISTENERS; i++ )
390 cyg_thread_resume(listener_thread_handle[i]);
391 for ( i = 0; i < NDUMMIES; i++ )
392 cyg_thread_resume( dummy_thread_handle[i]);
394 // and let them start up and start listening...
395 cyg_thread_set_priority( self, PRIO_MASTERLOW );
396 CYG_TEST_INFO("All listeners should be go now");
397 cyg_thread_set_priority( self, PRIO_MASTERHIGH );
399 for ( i = 0; i < NSENDERS; i++ ) {
400 cyg_thread_create( (0 == i)
402 : PRIO_SENDER_LOW, // Priority
404 i, // entry parameter
406 &stack_sender[i][0], // Stack
408 &sender_thread_handle[i], // Handle
409 &sender_thread_data[i] // Thread data structure
411 cyg_thread_resume(sender_thread_handle[i]);
414 // Now we are still higher priority; so go low and let everyone else
415 // have their head. When we next run after this, it should all be
417 cyg_thread_set_priority( self, PRIO_MASTERLOW );
419 cyg_semaphore_peek( &recv_sema, &i );
420 CYG_TEST_CHECK( NLISTENERS == i, "Not enough recvs occurred!" );
422 cyg_semaphore_peek( &send_sema, &i );
423 CYG_TEST_CHECK( NLISTENERS == i, "Not enough sends occurred!" );
425 CYG_TEST_PASS_FINISH("Master returned OK");
427 CYG_TEST_NA( "No loopback devs" );
435 cyg_thread_create(PRIO_MASTERHIGH, // Priority
437 0, // entry parameter
439 &stack_master[0], // Stack
440 MASTER_STACK_SIZE, // Size
441 &master_thread_handle, // Handle
442 &master_thread_data // Thread data structure
444 cyg_thread_resume(master_thread_handle); // Start it
447 // EOF multi_lo_select.c