For Programmers: Free Programming Magazines  


Home > Archive > Unix Programming > April 2005 > implement a queue as shared memory









You are viewing an archived Text-only version of the thread. To view this thread in it's original format and/or if you want to reply to this thread please [click here]

 

Author implement a queue as shared memory
leshanster@gmail.com

2005-04-14, 8:58 pm

I am trying to figure out how to implement a queue using shared memory.
I want to do this so 2 threads can access this queue, but with mutual
exclusion.

say I wanted a queue of max size 5 integers

int seg_id = shmget(IPC_PRIVATE, 20480, S_IRUSR | S_WRUSR); //this
gets my shared memory

but how do I make this shared memory a queue? could I do this:

queue<int> *shared_queue;

shared_queue = (int *) shmat(seg_id, NULL, 0);

would this work? would this allow me to still use the common queue
functions like pop, push, front?

am I way off on this?

Leshanster

David Schwartz

2005-04-14, 8:58 pm


<leshanster@gmail.com> wrote in message
news:1113507714.525015.246700@o13g2000cwo.googlegroups.com...

>I am trying to figure out how to implement a queue using shared memory.
> I want to do this so 2 threads can access this queue, but with mutual
> exclusion.


If you mean 2 threads in the same process, they already share all their
memory.

> say I wanted a queue of max size 5 integers
>
> int seg_id = shmget(IPC_PRIVATE, 20480, S_IRUSR | S_WRUSR); //this
> gets my shared memory
>
> but how do I make this shared memory a queue? could I do this:
>
> queue<int> *shared_queue;
>
> shared_queue = (int *) shmat(seg_id, NULL, 0);
>
> would this work? would this allow me to still use the common queue
> functions like pop, push, front?
>
> am I way off on this?


That certainly won't work. If you want to implement a queue that can be
shared by different processes, you'll have to code it along with the
necessary synchronization. If you want to implement a queue that can be
shared by threads in a process, you can just use the queue class, but you'll
need to wrap it in appropriate locks (pthread_mutex_* functions).

DS


leshanster

2005-04-14, 8:58 pm

what I am trying to do (first with threads which isn't going to work,
so I am going to have to throw away that idea), is I want to have 2
processes access shared memory. But I want the shared memory to be a
FIFO queue of size 5.

//this would get the space needed for 5 integers
int seg_id = shmget(IPC_PRIVATE, 20480, S_IRUSR | S_WRUSR);

^ but with that, how do I turn that memory space into a queue?

David Schwartz

2005-04-14, 8:58 pm


"leshanster" <leshanster@gmail.com> wrote in message
news:1113511372.710173.163330@o13g2000cwo.googlegroups.com...
> what I am trying to do (first with threads which isn't going to work,
> so I am going to have to throw away that idea), is I want to have 2
> processes access shared memory. But I want the shared memory to be a
> FIFO queue of size 5.
>
> //this would get the space needed for 5 integers
> int seg_id = shmget(IPC_PRIVATE, 20480, S_IRUSR | S_WRUSR);
>
> ^ but with that, how do I turn that memory space into a queue?


You have to write a queue class that understands shared memory and
synchronization. What do you plan to use for synchronization? Semaphores?

By the way, it's *much* easier with threads.

DS


phil_gg04@treefic.com

2005-04-14, 8:58 pm

> I want to have 2
> processes access shared memory. But I want the shared memory to be a
> FIFO queue of size 5.


You could alternatively use a pipe (man pipe) or inter-process message
queue (man msgget, msgsend).

Using shared memory, the shared region may exist at different addresses
in the different processes. So you can't use pointers. This almost
certainly means that you can't use the C++ STL queue classes, or
probably any C++. Stick to a simple struct like this:

typedef struct {
int in_pos;
int out_pos;
base_type data[5];
} queue;

Declare this as volatile.

Next you need to worry about locking. In the case of a fifo, I think
that it is sufficient to use atomic assignment when changing in_pos and
out_pos. The GNU libc documentation says:

In practice, you can assume that `int' and other integer types no
longer than `int' are atomic. You can also assume that pointer types
are atomic; that is very convenient. Both of these assumptions are
true on all of the machines that the GNU C library supports and on
all
POSIX systems we know of.

If you don't believe them, or need a more complex data structure, a
more heavyweight lock would be needed.

Then write push(), pop(), full(), empty() etc. in the obvious way.

--Phil.

Darius

2005-04-15, 3:59 pm


leshanster wrote:
> what I am trying to do (first with threads which isn't going to work,
> so I am going to have to throw away that idea), is I want to have 2
> processes access shared memory. But I want the shared memory to be a
> FIFO queue of size 5.


you can still use thread, in this way you will not have to use a shared
memory, you can directly use a global variable to implement queue(using
linked list), take care that you use "mutex" to avoid race conditions.
>
> //this would get the space needed for 5 integers
> int seg_id = shmget(IPC_PRIVATE, 20480, S_IRUSR | S_WRUSR);
>
> ^ but with that, how do I turn that memory space into a queue?


leshanster

2005-04-15, 3:59 pm

Is there a way then I could just the shared memory as an integer array?

Something like:

int* front;

front = tshmat(shmid, 0, 0);

then could I just use

front[0] = 3;

Would this work?

Pascal Bourguignon

2005-04-15, 3:59 pm

"leshanster" <leshanster@gmail.com> writes:

> Is there a way then I could just the shared memory as an integer array?
>
> Something like:
>
> int* front;
>
> front = tshmat(shmid, 0, 0);
>
> then could I just use
>
> front[0] = 3;
>
> Would this work?


You've not been reading what David and Darius have written, have you?


You must distinguish threads and processes.

Threads in a process use all the same memory. If you only have threads
in one process, you don't need shared memory. A simple global
variable will do.


If you have several processes, each process has its own memory, then
you need shared memory to communicate between these processes.


In both cases, while you'll be able to read and write the memory, your
file abstract data type won't work if you don't synchronize accesses.

Let's take the case of threads, with the following queue:

#define qsize (5)
typedef struct {
int head;
int length;
int data[qsize];
} queue_t;

void queue_init(queue_t* q){
q->head=0;
q->length=0; }

void enqueue(queue_t* q,int value){
if(q->length<qsize){
q->data[(q->head+q->length)%qsize]=value;
q->length++; }
else{
error("queue is full"); }}

int dequeue(queue_t* q){
if(q->length==0){
error("queue is empty"); }
else{
int result=q->data[q->head];
q->head++;
q->length--;
return(result); }}

queue_t* q=shmat(id,0,0);

Now, imagine q->head==0 and q->length==0, and thread 1 runs:

enqueue(q,1);

that is, it executes:

if(q->length<qsize){

and:

q->data[(q->head+q->length)%qsize]=value;

Now, q->head=0, q->length=0, and q->data[0]=1

Imagine now the OS decides to switch to thread 2, and thread 2 runs:

enqueue(q,2);

that is, it executes:

if(q->length<qsize){

and:

q->data[(q->head+q->length)%qsize]=value;
q->length++; }

Now, q->head=0, q->length=1, q->data[0]=2. If thread 2 continues with:

int v=dequeue(q);

it gets v=2 instead of v=1, and q->head=1, q->length=0

If now the OS decides to give back the processor to thread 1, it resumes:

q->length++; }

and now q->head=1, q->length=1 and q->data[1]= whatever garbage there
was there. and if thread 1 continues with:

int w=dequeue(q);

it gets w=garbage instead of v=2, and q->head=2, q->length=0;




When we say that you need to consider synchronisation, it means that
you must ensure that when you start an enqueue or a dequeue operation,
not matter what arrives, no other thread (or processes) will be able
to modify the same queue until the enqueue or dequeue operation is
finished. Access to these operations must be mutually exclusive, that
is, no two threads (or processes) must be able yo access these
operations at the same time. Hence the need to use mutex in the case
of threads, or semaphores in the case of processes.


So, go read:

man pthread_mutex_init
man pthread_mutex_lock
man pthread_mutex_unlock
man semget
man semop

and come back after.

--

__Pascal Bourguignon__ http://www.informatimago.com/
phil_gg04@treefic.com

2005-04-15, 8:58 pm

> Is there a way then I could just [use] the shared memory as an
integer array?
> Something like:
>
> int* front;
> front = tshmat(shmid, 0, 0);
> then could I just use
> front[0] = 3;
> Would this work?


(I think you mean shmat, not tshmat.)

Yes, this will work.

--Phil.

phil_gg04@treefic.com

2005-04-15, 8:58 pm

> Let's take the case of threads,

Leshanster's question is about processes using shared memory, not
threads.

> when you start an enqueue or a dequeue operation,
> not matter what arrives, no other thread (or processes) will be able
> to modify the same queue until the enqueue or dequeue operation is
> finished.


Why not? If I have a queue that contains "A B C D", and I am in the
process of removing "A" from the front, why shouldn't another thread
simultaneously be adding "E" to the end? Storing input and output
positions, rather than head and length as you suggest, makes this
possible.

--Phil.

phil_gg04@treefic.com

2005-04-15, 8:58 pm

>> when you start an enqueue or a dequeue operation,
> Why not? If I have a queue that contains "A B C D", and I am in the
> process of removing "A" from the front, why shouldn't another thread
> simultaneously be adding "E" to the end?


I should of course say that locking is necessary when there is more
than one process or thread trying to write to the queue and/or more
than one trying to read. The common case is where exactly one process
is reading and exactly one is writing; it is in this case that locking
can be reduced to atomic operations on the position counters. Sorry
for pressing "post" too quickly.

--Phil.

Pascal Bourguignon

2005-04-16, 3:58 am

phil_gg04@treefic.com writes:

>
> Leshanster's question is about processes using shared memory, not
> threads.
>
>
> Why not? If I have a queue that contains "A B C D", and I am in the
> process of removing "A" from the front, why shouldn't another thread
> simultaneously be adding "E" to the end? Storing input and output
> positions, rather than head and length as you suggest, makes this
> possible.


Because the operations enqueue and dequeue are composed of several
operations working on the same fields, and if you let them run in a
random order, you can get random bugs.

You've cut too much to see it, so I'll have to paste it again:


#define qsize (5)
typedef struct {
int head;
int length;
int data[qsize];
} queue_t;

void queue_init(queue_t* q){
q->head=0;
q->length=0; }

void enqueue(queue_t* q,int value){
if(q->length<qsize){
q->data[(q->head+q->length)%qsize]=value;
q->length++; }
else{
error("queue is full"); }}

int dequeue(queue_t* q){
if(q->length==0){
error("queue is empty"); }
else{
int result=q->data[q->head];
q->head++;
q->length--;
return(result); }}

Let's start with a queue such as:
q->head=1
q->length=1
q->data[1]=42;

thread or process 1 does:
int v=dequeue(q)

eg:

if(q->length==0){
int result=q->data[q->head];
q->head++;

at which point result=42, q->head=2, q->length=1.
Then the OS switches to thread or process 2 which does:

enqueue(q,2)
eg:
if(q->length<qsize){
q->data[(q->head+q->length)%qsize]=value;

this sets q->data[3] to 2, instead of q->data[2]!

q->length++; }

at this point, q->head=2, q->length=2, q->data[2]=garbage, q->data[3]=2
Then the OS switches to thread or process 1 which does:

q->length--;
return(result); }}

at which point q->head=2, q->length=1, q->data[2]=garbage, v=42.



Using head and tail, you either lose one position (to distinguish a
full queue from an empty queue) or you have to keep at least a boolean
more. Anyways, in this case, you can still get problems if you have
several readers or several writters.

--
__Pascal Bourguignon__ http://www.informatimago.com/

In a World without Walls and Fences,
who needs Windows and Gates?
leshanster

2005-04-17, 8:57 pm

Let me show you what I have so far. This is kind of different, but I
am trying to learn all of this. Right now, I am just trying to create
a shared memory space, that I can use in 2 processes. I have 2 files,
process1 and process2. Both compile, but when I run process1, I get an
error that says "Bus Error (Core Dumped)"

process1:
----
#include <stdio.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

int main(){

int seg_id;
int forkID;

int* shared_mem;

const int size = sizeof(int) * 4;

int id = 250;

seg_id = shmget(id, size, S_IRUSR | S_IWUSR);

shared_mem = (int *) shmat(seg_id, NULL, 0);

forkID = fork();

if(forkID == 0){

//call execl
execl("test2", (char *) id, (char *) 0);
}
else{

wait(NULL);

printf("%d\n",shared_mem[0]);
printf("%d\n",shared_mem[1]);
printf("%d\n",shared_mem[2]);
printf("%d\n",shared_mem[3]);
}

shmdt(shared_mem);

shmctl(seg_id, IPC_RMID, NULL);

return 0;

}
-------
process2
---
#include <stdio.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

int main(int argc, char *argv[]){

int* front;
int size = sizeof(int) * 4;

int seg_id = shmget(atoi(argv[1]), size, S_IRUSR | S_IWUSR);

front = (int *) shmat(seg_id, NULL, 0);

printf("I am here!\n");

front[0] = 1;
front[1] = 2;
front[2] = 3;
front[3] = 4;

return 0;
}
---


Thanks guys!

phil_gg04@treefic.com

2005-04-17, 8:57 pm

> when I run process1, I get an error that says "Bus Error (Core
Dumped)"

You need to add lots of error checking to this code. Just about every
library function you call has some way of reporting an error. You need
to detect these and do something (e.g. print out the error message).
For example, you have

seg_id = shmget(id, size, S_IRUSR | S_IWUSR);

You need to follow that with something like this:

if (seg_id==-1) {
perror("shmget()");
exit(1);
}

--Phil.

Pascal Bourguignon

2005-04-17, 8:57 pm

"leshanster" <leshanster@gmail.com> writes:

> Let me show you what I have so far. This is kind of different, but I
> am trying to learn all of this. Right now, I am just trying to create
> a shared memory space, that I can use in 2 processes. I have 2 files,
> process1 and process2. Both compile, but when I run process1, I get an
> error that says "Bus Error (Core Dumped)"
> [...]
> int id = 250;
>
> seg_id = shmget(id, size, S_IRUSR | S_IWUSR);
>
> shared_mem = (int *) shmat(seg_id, NULL, 0);
>
> forkID = fork();
>
> if(forkID == 0){
>
> //call execl
> execl("test2", (char *) id, (char *) 0);


id is not a string.

--
__Pascal_Bourguignon__ _ Software patents are endangering
() ASCII ribbon against html email (o_ the computer industry all around
/\ 1962:DO20I=1.100 //\ the world http://lpf.ai.mit.edu/
2001:my($f)=`fortune`; V_/ http://petition.eurolinux.org/
leshanster

2005-04-17, 8:57 pm

don't I have to cast it into a character array so it can be used in the
argv[] in the other process?

phil_gg04@treefic.com

2005-04-17, 8:57 pm

>> id is not a string.

Well spotted Pascal.

> don't I have to cast it into a character array so it can be used in

the
> argv[] in the other process?


You need to convert it, but you can't just cast it. You are asking the
processor to pass the string that starts at address 250 in its memory,
which could be any old rubbish or be unreadable memory. Use
sprintf("%d") or similar.

You do the right thing at the other end where you use atoi.

--Phil.

Sponsored Links







Also available: Server administration forum archive | Web Design forum archive | Software forum archive | Hardware reviews archive

Copyright 2008 codecomments.com