Tensorflow能通过多线程共享内存不加锁让多个线程过一个网络吗

生成检查点文件(chekpoint file),扩展名.ckpt,tf.train.Saver对象调用Saver.save()生成。包含权重和其他程序定义变量,不包含图结构。另一程序使用,需要重新创建图形结构,告诉TensorFlow如何处理权重。
生成图协议文件(graph proto file),二进制文件,扩展名.pb,tf.tran.write_graph()保存,只包含图形结构,不包含权重,tf.import_graph_def加载图形。
模型存储,建立一个tf.train.Saver()保存变量,指定保存位置,扩展名.ckpt。
神经网络,两个全连接层和一个输出层,训练MNIST数据集,存储训练好的模型。
加载数据、定义模型:
mnist = input_data.read_data_sets("MNIST_data/",one_hot=True)
trX,trY,teX,teY = mnist.train.images,mnist.train.labels,mnist.test.images,mnist.test.labels
X = tf.placeholder("float",[None,784])
Y = tf.placeholder("float",[None,10])
#初始化权重参数
w_h = init_weights([784,625])
w_h2 = init_weights([625,625])
w_o = int_weights([625,10])
#定义权重函数
def init_weights(shape):
return tf.Variable(tf.random_normal(shape,stddev=0.01))
def model(X,w_h,w_h2,w_o,p_keep_input,p_keep_hidden):
#第一个全连接层
X = tf.nn.dropout(X,p_keep_input)
h = tf.nn.relu(tf.matmul(X,w_h))
h = tf.nn.dropout(h,p_keep,hidden)
#第二个全连接层
h2 = tf.nn.relu(tf.matmul(h,w_h2))
h2 = tf.nn.dropout(h2,p_keep_hidden)
return tf.matmul(h2,w_o)#输出预测值
#定义损失函数
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(py_x,Y))
train_op = tf.train.RMSPropOptimizer(0.001,0.9).minimize(cost)
predict_op = tf.argmax(py_x,1)
训练模型及存储模型:
#定义存储路径
ckpt_dir = "./ckpt_dir"
if not os.path.exists(ckpt_dir):
os.makedirs(ckpt_dir)
定义计数器,为训练轮数计数:
#计数器变量,设置它的trainable=False,不需要被训练
global_step = tf.Variable(0,name='global_step',trainable=False)
定义完所有变量,tf.train.Saver()保存、提取变量,后面定义变量不被存储:
#声明完所有变量,调tf.train.Saver
saver = tf.train.Saver()
#之后变量不被存储
non_storable_variable = tf.Variable(777)
训练模型并存储:
with tf.Session() as sess:
tf.initialize_all_variables().run()
start = global_step.eval() #得到global_stepp初始值
print("Start from:",start)
for i in range(start,100):
#128 batch_size
for start,end in zip(range(0,len(trX),128),range(128,len(trX)+1,128)):
sess.run(train_op,feed_dict={X:trX[start:end],Y:trY[start:end],p_keep_input:0.8, p_keep_hidden:0.5})
global_step.assign(i).eval() #更新计数器
saver.save(sess,ckpt_dir+"/model.ckpt",global_step=global_step) #存储模型
训练过程,ckpt_dir出现16个文件,5个model.ckpt-{n}.data00000-of-00001文件,保存的模型。5个model.ckpt-{n}.meta文件,保存的元数据。TensorFlow默认只保存最近5个模型和元数据,删除前面没用模型和元数据。5个model.ckpt-{n}.index文件。{n}代表迭代次数。1个检查点文本文件,保存当前模型和最近5个模型。
将之前训练参数保存下来,在出现意外状竞接着上一次地方开始训练。每个固定轮数在检查点保存一个模型(.ckpt文件),随时将模型拿出来预测。
加载模型。
saver.restore加载模型:
with tf.Session() as sess:
tf.initialize_all_variables().run()
ckpt = tf.train.get_checkpoint_state(ckpt_dir)
if ckpt and ckpt.model_checkpoint_path:
print(ckpt.model_checkpoint_path)
saver.restore(sess,ckpt.model_checkpoint_path) #加载所有参数
图存储加载。
仅保存图模型,图写入二进制协议文件:
v = tf.Variable(0,name='my_variable')
sess = tf.Session()
tf.train.write_graph(sess.graph_def,'/tmp/tfmodel','train.pbtxt')
with tf.Session() as _sess:
with grile.FastGFile("/tmp/tfmodel/train.pbtxt",'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
_sess.graph.as_default()
tf.import_graph_def(graph_def,name='tfgraph')
队列、线程。
队列(queue),图节点,有状态节点。其他节点(入队节点enqueue,出队节点depueue),修改它内容。入队节点把新元素插到队列末尾,出队节点把队列前面元素删除。
FIFOQueue、RandomShuffleQueue,源代码在tensorflow-1.0.0/tensorflow/python/ops/data_flow_ops.py 。
FIFOQueue,创建先入先出队列。循环神经网络结构,读入训练样本需要有序。
创建含队列图:
import tensorflow as tf
#创建先入先出队列,初始化队列插入0.1、0.2、0.3数字
q = tf.FIFOQueue(3,"float")
init = q.enqueue_many(([0.1,0.2,0.3]))
#定义出队、+1、入队操作
x = q.dequeue()
q_inc = q.enqueue([y])
开户会话,执行2次q_inc操作,查看队列内容:
with tf.Session() as sess:
sess.run(init)
quelen = sess.run(q.size())
for i in range(2):
sess.run(q_inc) #执行2次操作,队列值变0.3,1.1,1.2
quelen = sess.run(q.size())
for i in range(quelen):
print(sess.run(q.dequeue())) #输出队列值
RandomShuffleQueue,创建随机队列。出队列,以随机顺序产生元素。训练图像样本,CNN网络结构,无序读入训练样本,每次随机产生一个训练样本。
异步计算时非常重要。TensorFlow会话支持多线程,在主线程训练操作,RandomShuffleQueue作训练输入,开多个线程准备训练样本。样本压入队列,主线程从队列每次取出mini-batch样本训练。
创建随机队列,最大长度10,出队后最小长度2:
q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtype="float")
开户会话,执行10次入队操作,8次出队操作:
sess = tf.Session()
for i in range(0,10): #10次入队
sess.run(q.enqueue(i))
for i in range(0,8): #8次出队
print(sess.run(q.dequeue()))
阻断,队列长度等于最小值,执行出队操作;队列长度等于最大值,执行入队操作。
设置绘画运行时等待时间解除阻断:
run_iptions = tf.RunOptions(timeout_in_ms = 10000) #等待10秒
sess.run(q.dequeue(),options=run_options)
except tf.errors.DeadlineExceededError:
print('out of range')
会话主线程入队操作,数据量很大时,入队操作从硬盘读取数据,放入内存,主线程需要等待入队操作完成,才能进行训练操作。会话运行多个线程,线程管理器QueueRunner创建一系列新线程进行入队操作,主线程继续使用数据,训练网张和读取数据是异步的,主线程训练网络,另一线程将数据从硬盘读入内存。
队列管理器。
创建含队列图:
q = tf.FIFOQueue(1000,"float")
counter = tf.Variable(0.0) #计数器
increment_op = tf.assign_add(counter,tf.constant(1.0)) #操作:给计算器加1
enqueue_op = q.enqueue(counter) #操作:计数器值加入队列
创建队列管理器QueueRunner,两个操作向队列q添加元素。只用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op] * 1)
启动会话,从队列管理器qr创建线程:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
enqueue_threads = qr.create_threads(sess,start=True) #启动入队线程
for i in range(10):
print(sess.run(q.dequeue()))
不是自然数列,线程被阴断。加1操作和入队操作不同步。加1操作执行多次后,才进行一次入队操作。主线程训练(出队操作)和读取数据线程的训练(入队操作)异步,主线程一直等待数据送入。
入队线程自顾执行,出队操作完成,程序无法结束。tf.train.Coordinator实现线程同步,终止其他线程。
线程、协调器。
协调器(coordinator)管理线程。
sess = tf.Session()
sess.run(tf.global_variables_initializer())
#Coordinator:协调器,协调线程间关系可以视为一种信号量,用来做同步
coord = tf.train.Coordinator()
#启动入队线程,协调器是线程的参数
enqueue_threads = qr.create_threads(sess,coord = coord,start=True)
for i in range(0,10):
print(sess.run(q.dequeue()))
coord.request_stop() #通知其他线程关闭
coord.join(enqueue_threads) #join操作等待其他线程结束,其他所有线程关闭之后,函数才能返回
关闭队列线程,执行出队操作,抛出tf.errors.OutOfRange错误。coord.request_stop()和主线程出队操作q.dequeue()调换位置。
coord.request_stop()
for i in range(0,10):
print(sess.run(q.dequeue()))
coord.join(enqueue_threads)
tf.errors.OutOfRangeError捕捉错误:
coord.request_stop()
for i in range(0,10):
print(sess.run(q.dequeue()))
except tf.errors.OutOfRangeError:
coord.join(enqueue_threads)
所有队列管理器默认加在图tf.GraphKeys.QUEUE_RENNERS集合。
加载数据。
预加载数据(preloaded data),在TensorFlow图中定义常量或变量保存所有数据。填充数据(feeding),Python产生数据,数据填充后端。从文件读取数据(reading from file),队列管理器从文件中读取数据。
预加载数据。数据直接嵌在数据流图中,训练数据大时,很消耗内存。
x1 = tf.constant([2,3,4])
x2 = tf.constant([4,0,1])
y = tf.add(x1,x2)
填充数据。sess.run()中的feed_dict参数,Python产生数据填充后端。数据量大、消耗内存,数据类型转换等中间环节增加开销。
import tensorflow as tf
a1 = tf.placeholder(tf.int16)
a2 = tf.placeholder(tf.int16)
b = tf.add(x1,x2)
#用Python产生数据
li1 = [2,3,4]
li2 = [4,0,1]
#打开会话,数据填充后端
with tf.Session() as sess:
print sess.run(b,feed_dict={a1:li1,a2:li2})
从文件读取数据。图中定义好文件读取方法,TensorFlow从文件读取数据,解码成可使用样本集。
把样本数据写入TFRecords二进制文件。再从队列读取。
TFRecords二进制文件,更好利用内存,方便复制和移动,不需要单独标记文件。tensorflow-1.1.0/tensorflow/examples/how_tos/reading_data/convert_to_records.py。
生成TFRecords文件。定义主函数,给训练、验证、测试数据集做转换。获取数据。编码uint8。数据转换为tf.train.Example类型,写入TFRecords文件。转换函数convert_to,数据填入tf.train.Example协议缓冲区(protocol buffer),协议组冲区序列转为字符串,tf.python_io.TFRecordWriter写入TFRecords文件。55000个训练数据,5000个验证数据,10000个测试数据。黑白图像,单通道。写入协议缓冲区,height、width、depth、label编码int64类型,image_raw编码成二进制。序列化为字符串。运行结束,在/tmp/data生成文件train.tfrecords、validation.tfrecords、test.tfrecords。
从队列读取。创建张量,从二进制文件读取一个样本。创建张量,从二进制文件随机读取一个mini-batch。把每一批张量传入网络作为输入节点。代码 tensorflow-1.1.0/tensorflow/examples/how_tos/reading_data/fully_connected_reader.py。首先定义从文件中读取并解析一个样本。输入文件名队列。解析example。写明features里key名称。图片 string类型。标记 int64类型。BytesList 重新解码,把string类型0维Tensor变成unit8类型一维Tensor。image张量形状Tensor(“input/sub:0”,shape=(784,),dtype=float32)。把标记从uint8类型转int32类型。label张量形状Tensor(“input/Cast_1:0”,shape=(),dtype=int32)。
tf.train.shuffle_batch样本随机化,获得最小批次张量。输入参数,train 选择输入训练数据/验证数据,batch_size 训练每批样本数,num_epochs 过几遍数据 设置0/None表示永远训练下去。返回结果,images,类型float, 形状[batch_size,mnist.IMAGE_PIXELS],范围[-0.5,0.5]。labels,类型int32,形状[batch_size],范围[0,mnist.NUM_CLASSES]。tf.train.QueueRunner用tf.train.start_queue_runners()启动线程。获取文件路径,/tmp/data/train.tfrecords, /tmp/data/validation.records。tf.train.string_input_producer返回一个QueueRunner,里面有一个FIFOQueue。如果样本量很大,分成若干文件,文件名列表传入。随机化example,规整成batch_size大小。留下部分队列,保证每次有足够数据做随机打乱。
生成batch张量作网络输入,训练。输入images、labels。构建从揄模型预测数据的图。定义损失函数。加入图操作,训练模型。初始化参数,string_input_producer内疗创建一个epoch计数变量,归入tf.GraphKeys.LOCAL_VARIABLES集合,单独用initialize_local_variables()初始化。开启输入入阶线程。进入永久循环。每100次训练输出一次结果。通知其他线程关闭。数据集大小55000,2轮训练,110000个数据,batch_size大小100,训练次数1100次,每100次训练输出一次结果,输出11次结果。
TensorFlow使用TFRecords文件训练样本步骤:在生成文件名队列中,设定epoch数量。训练时,设定为无穷循环。在读取数据时,如果捕捉到错误,终止。
实现自定义操作。
需要熟练掌握C++语言。对张量流动和前向传播、反向传播有深理解。
步骤。在C++文件(*_ops.cc文件)中注册新操作。定义操作功能接口规范,操作名称、输入、输出、属性等。在C++文件(*_kenels.cc文件)实现操作,可以实现在CPU、GPU多个内核上。测试操作,编译操作库文件(*_ops.so文件),Python使用操作。
最佳实践。词嵌入。源代码
第一步,创建word2vec_ops.cc注册两个操作,SkipgramWord2vec、NegTrainWord2vec。
第二步,两个操作在CPU设备实现,生成word2vec_kernels.cc文件。
第三步,编译操作类文件,测试。在特定头文件目录下编译,Python提供get_include获取头文件目录,C++编译器把操作编译成动态库。TensorFlow Python API提供tf.load_op_library函数加载动态库,向TensorFlow 框架注册操作。load_op_library返回包含操作和内核Python模块。
参考资料:
《TensorFlow技术解析与实战》
欢迎付费咨询(150元每小时),我的微信:qingxingfengzi
本文已收录于以下专栏:
相关文章推荐
对于机器学习,尤其是深度学习DL的算法,模型训练可能很耗时,几个小时或者几天,所以如果是测试模块出了问题,每次都要重新运行就显得很浪费时间,所以如果训练部分没有问题,那么可以直接将训练的模型保存起来,...
tensorflow训练模型通常使用python api编写,简单记录下这些模型保存后怎么在java中调用。
python中训练完成,模型保存使用如下api保存:
# 保存二进制模型
原文地址:http://blog.csdn.net/lujiandong1/article/details/
TensorFlow是一种符号编程框架(与theano类似),先构建...
学习tensorflow的目的是能够训练的模型,并且利用已经训练好的模型对新数据进行预测。下文就是一个简单的保存模型加载模型的过程。
import ten...
学习tensorflow的目的是能够训练的模型,并且利用已经训练好的模型对新数据进行预测。下文就是一个简单的保存模型加载模型的过程。
import tenso...
AlexNet(Alex Krizhevsky,ILSVRC2012冠军)适合做图像分类。层自左向右、自上向下读取,关联层分为一组,高度、宽度减小,深度增加。深度增加减少网络计算量。训练模型数据集 S...
TensorFlow支持JPG、PNG图像格式,RGB、RGBA颜色空间。图像用与图像尺寸相同(height*width*chnanel)张量表示。通道表示为包含每个通道颜色数量标量秩1张量。图像所有...
前言整体步骤在TensorFlow中进行模型训练时,在官网给出的三种读取方式,中最好的文件读取方式就是将利用队列进行文件读取,而且步骤有两步:
1. 把样本数据写入TFRecords二进制文件
这一篇其实本该放在后面写的,只是今天正好把这个整出来了,所以为了防止后面忘记,就在这里先写了。
在运行index.html文件的时候,引擎首先会把资源进行加载,这是为了让游戏在运行时更为流畅,避免了在...
他的最新文章
讲师:王哲涵
讲师:韦玮
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)一步步带你探究如何高效使用TensorFlow - 简书
一步步带你探究如何高效使用TensorFlow
摘要:正在学习TensorFlow,利用效率不够高?不懂TensorFlow里面的奥秘?看大神如何一步步教你如何高效使用TensorFlow!
Tensorflow基础知识:
Tensorflow和其他数字计算库(如numpy)之间最明显的区别在于Tensorflow中操作的是符号。这是一个强大的功能,这保证了Tensorflow可以做很多其他库(例如numpy)不能完成的事情(例如自动区分)。这可能也是它更复杂的原因。今天我们来一步步探秘Tensorflow,并为更有效地使用Tensorflow提供了一些指导方针和最佳实践。
我们从一个简单的例子开始,我们要乘以两个随机矩阵。首先我们来看一下在numpy中如何实现:
现在我们使用Tensorflow中执行完全相同的计算:
与立即执行计算并将结果复制给输出变量z的numpy不同,tensorflow只给我们一个可以操作的张量类型。如果我们尝试直接打印z的值,我们得到这样的东西:
由于两个输入都是已经定义的类型,tensorFlow能够推断张量的符号及其类型。为了计算张量的值,我们需要创建一个会话并使用Session.run方法进行评估。
要了解如此强大的符号计算到底是什么,我们可以看看另一个例子。假设我们有一个曲线的样本(例如f(x)= 5x ^ 2 + 3),并且我们要估计f(x)在不知道它的参数的前提下。我们定义参数函数为g(x,w)= w0 x ^ 2 + w1 x + w2,它是输入x和潜在参数w的函数,我们的目标是找到潜在参数,使得g(x, w)≈f(x)。这可以通过最小化损失函数来完成:L(w)=(f(x)-g(x,w))^ 2。虽然这问题有一个简单的封闭式的解决方案,但是我们选择使用一种更为通用的方法,可以应用于任何可以区分的任务,那就是使用随机梯度下降。我们在一组采样点上简单地计算相对于w的L(w)的平均梯度,并沿相反方向移动。
以下是在Tensorflow中如何完成:
通过运行这段代码,我们可以看到下面这组数据:
[4.., 3.4504161]
这与我们的参数已经相当接近。
这只是Tensorflow可以做的冰山一角。许多问题,如优化具有数百万个参数的大型神经网络,都可以在Tensorflow中使用短短的几行代码高效地实现。而且Tensorflow可以跨多个设备和线程进行扩展,并支持各种平台。
理解静态形状和动态形状的区别:
Tensorflow中的张量在图形构造期间具有静态的形状属性。例如,我们可以定义一个形状的张 量[None,128]:
这意味着第一个维度可以是任意大小的,并且将在Session.run期间随机确定。Tensorflow有一个非常简单的API来展示静态形状:
为了获得张量的动态形状,你可以调用tf.shape op,它将返回一个表示给定形状的张量:
我们可以使用Tensor.set_shape()方法设置张量的静态形状:
实际上使用tf.reshape()操作更为安全:
这里有一个函数可以方便地返回静态形状,当静态可用而动态不可用的时候。
现在想象一下,如果我们要将三维的张量转换成二维的张量。在TensorFlow中我们可以使用get_shape()函数:
请注意,无论是否静态指定形状,都可以这样做。
实际上,我们可以写一个通用的重塑功能来如何维度之间的转换:
然后转化为二维就变得非常容易了:
广播机制(broadcasting)的好与坏:
Tensorflow同样支持广播机制。当要执行加法和乘法运算时,你需要确保操作数的形状匹配,例如,你不能将形状[3,2]的张量添加到形状的张量[3,4]。但有一个特殊情况,那就是当你有一个单一的维度。Tensorflow隐含地功能可以将张量自动匹配另一个操作数的形状。例如:
广播允许我们执行隐藏的功能,这使代码更简单,并且提高了内存的使用效率,因为我们不需要再使用其他的操作。为了连接不同长度的特征,我们通常平铺式的输入张量。这是各种神经网络架构的最常见模式:
这可以通过广播机制更有效地完成。我们使用f(m(x + y))等于f(mx + my)的事实。所以我们可以分别进行线性运算,并使用广播进行隐式级联:
pa = tf.layers.dense(a, 10, activation=None)
实际上,这段代码很普遍,只要在张量之间进行广播就可以应用于任意形状的张量:
到目前为止,我们讨论了广播的好的部分。但是你可能会问什么坏的部分?隐含的假设总是使调试更加困难,请考虑以下示例:
你认为C的数值是多少如果你猜到6,那是错的。这是因为当两个张量的等级不匹配时,Tensorflow会在元素操作之前自动扩展具有较低等级的张量,因此加法的结果将是[[2,3],[3,4]]。
如果我们指定了我们想要减少的维度,避免这个错误就变得很容易了:
这里c的值将是[5,7]。
使用Python实现原型内核和高级可视化的操作:
为了提高效率,Tensorflow中的操作内核完全是用C ++编写,但是在C ++中编写Tensorflow内核可能会相当痛苦。。使用tf.py_func(),你可以将任何python代码转换为Tensorflow操作。
例如,这是python如何在Tensorflow中实现一个简单的ReLU非线性内核:
要验证梯度是否正确,你可以使用Tensorflow的梯度检查器:
compute_gradient_error()是以数字的方式计算梯度,并返回与渐变的差异,因为我们想要的是一个很小的差异。
请注意,此实现效率非常低,只对原型设计有用,因为python代码不可并行化,不能在GPU上运行。
在实践中,我们通常使用python ops在Tensorboard上进行可视化。试想一下你正在构建图像分类模型,并希望在训练期间可视化你的模型预测。Tensorflow允许使用函数tf.summary.image()进行可视化:
但这只能显示输入图像,为了可视化预测,你必须找到一种方法来添加对图像的注释,这对于现有操作几乎是不可能的。一个更简单的方法是在python中进行绘图,并将其包装在一个python方法中:
请注意,由于概要通常只能在一段时间内进行评估(不是每步),因此实施中可以使用该实现,而不用担心效率。
本文由北邮@爱可可-爱生活老师推荐,@阿里云云栖社区组织翻译。
文章原标题《Effective Tensorflow - Guides and best practices for effective use of Tensorflow》
作者:Vahid Kazemi 作者是google的件工程师,CS中的博士学位。从事机器学习,NLP和计算机视觉工作。
译者:袁虎
汇集阿里技术精粹,线程间同步方式总结-爱编程
线程间同步方式总结
/Creator/archive//2455584.html
http://blog.csdn.net/hongmy525/article/details/5194006
http://blog.csdn.net/qinxiongxu/article/details/7830537
/diyingyun/archive//2275229.html
信号量与普通整型变量的区别:
①信号量(semaphore)是非负整型变量,除了初始化之外,它只能通过两个标准原子操作:wait(semap) , signal(semap) ; 来进行访问;
②操作也被成为PV原语(P来源于Dutch proberen&测试&,V来源于Dutch verhogen&增加&),而普通整型变量则可以在任何语句块中被访问;&
信号量与互斥锁之间的区别:
1. 互斥量用于线程的互斥,信号线用于线程的同步。&&
这是互斥量和信号量的根本区别,也就是互斥和同步之间的区别。&&
互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。&&
同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源&&
2. 互斥量值只能为0/1,信号量值可以为非负整数。&&
也就是说,一个互斥量只能用于一个资源的互斥访问,它不能实现多个资源的多线程互斥问题。信号量可以实现多个同类资源的多线程互斥和同步。当信号量为单值信号量是,也可以完成一个资源的互斥访问。&&
3. 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。&
信号量可以分为几类:&&
2 二进制信号量(binary semaphore):只允许信号量取0或1值,其同时只能被一个线程获取。&&
2 整型信号量(integer semaphore):信号量取值是整数,它可以被多个线程同时获得,直到信号量的值变为0。&&
2 记录型信号量(record semaphore):每个信号量s除一个整数值value(计数)外,还有一个等待队列List,其中是阻塞在该信号量的各个线程的标识。当信号量被释放一个,值被加一后,系统自动从等待队列中唤醒一个等待中的线程,让其获得信号量,同时信号量再减一。&&
信号量通过一个计数器控制对共享资源的访问,信号量的值是一个非负整数,所有通过它的线程都会将该整数减一。如果计数器大于0,则访问被允许,计数器减1;如果为0,则访问被禁止,所有试图通过它的线程都将处于等待状态。&&
计数器计算的结果是允许访问共享资源的通行证。因此,为了访问共享资源,线程必须从信号量得到通行证, 如果该信号量的计数大于0,则此线程获得一个通行证,这将导致信号量的计数递减,否则,此线程将阻塞直到获得一个通行证为止。当此线程不再需要访问共享资源时,它释放该通行证,这导致信号量的计数递增,如果另一个线程等待通行证,则那个线程将在那时获得通行证。&&
Semaphore可以被抽象为五个操作:&&
- 创建 Create&&
- 等待 Wait:&&
线程等待信号量,如果值大于0,则获得,值减一;如果只等于0,则一直线程进入睡眠状态,知道信号量值大于0或者超时。&&
-释放 Post&&
执行释放信号量,则值加一;如果此时有正在等待的线程,则唤醒该线程。&&
-试图等待 TryWait&&
如果调用TryWait,线程并不真正的去获得信号量,还是检查信号量是否能够被获得,如果信号量值大于0,则TryWait返回成功;否则返回失败。&&
-销毁 Destroy&&
信号量,是可以用来保护两个或多个关键代码段,这些关键代码段不能并发调用。在进入一个关键代码段之前,线程必须获取一个信号量。如果关键代码段中没有任何线程,那么线程会立即进入该框图中的那个部分。一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端。确认这些信号量VI引用的是初始创建的信号量。
&CreateSemaphore
&WaitForSingleObject
&sem _wait
&ReleaseMutex
&sem _post
&WaitForSingleObject
&sem _trywait
&CloseHandle
&sem_destroy
互斥量(Mutex)&&
互斥量表现互斥现象的数据结构,也被当作二元信号灯。一个互斥基本上是一个多任务敏感的二元信号,它能用作同步多任务的行为,它常用作保护从中断来的临界段代码并且在共享同步使用的资源。&&
Mutex本质上说就是一把锁,提供对资源的独占访问,所以Mutex主要的作用是用于互斥。Mutex对象的值,只有0和1两个值。这两个值也分别代表了Mutex的两种状态。值为0, 表示锁定状态,当前对象被锁定,用户进程/线程如果试图Lock临界资源,则进入排队等待;值为1,表示空闲状态,当前对象为空闲,用户进程/线程可以Lock临界资源,之后Mutex值减1变为0。&&
Mutex可以被抽象为四个操作:&&
- 创建 Create&&
- 加锁 Lock&&
- 解锁 Unlock&&
- 销毁 Destroy&&
Mutex被创建时可以有初始值,表示Mutex被创建后,是锁定状态还是空闲状态。在同一个线程中,为了防止死锁,系统不允许连续两次对Mutex加锁(系统一般会在第二次调用立刻返回)。也就是说,加锁和解锁这两个对应的操作,需要在同一个线程中完成。&&
不同操作系统中提供的Mutex函数: 动作\系统
&CreateMutex
&pthread_mutex_init
&mutex_init
&WaitForSingleObject
&pthread_mutex_lock
&mutex_lock
&ReleaseMutex
&pthread_mutex_unlock
&mutex_unlock
&CloseHandle
&pthread_mutex_destroy
&mutex_destroy
&33250人阅读&&&
嵌入式linux开发(28)&
&285人阅读&&&&
一.什么是信号量
信号量的使用主要是用来保护共享资源,使得资源在一个时刻只有一个进程(线程)
信号量的值为正的时候,说明它空闲。所测试的线程可以锁定而使用它。若为0,说明
它被占用,测试的线程要进入睡眠队列中,等待被唤醒。
二.信号量的分类
在学习信号量之前,我们必须先知道——Linux提供两种信号量:
(1) 内核信号量,由内核控制路径使用
(2) 用户态进程使用的信号量,这种信号量又分为POSIX信号量和SYSTEM
POSIX信号量又分为有名信号量和无名信号量。
有名信号量,其值保存在文件中, 所以它可以用于线程也可以用于进程间的同步。无名
信号量,其值保存在内存中。
倘若对信号量没有以上的全面认识的话,你就会很快发现自己在信号量的森林里迷
失了方向。
三.内核信号量
1.内核信号量的构成
内核信号量类似于自旋锁,因为当锁关闭着时,它不允许内核控制路径继续进行。然而,
当内核控制路径试图获取内核信号量锁保护的忙资源时,相应的进程就被挂起。只有在资源
被释放时,进程才再次变为可运行。
只有可以睡眠的函数才能获取内核信号量;中断处理程序和可延迟函数都不能使用内
核信号量。
内核信号量是struct semaphore类型的对象,它在&asm/semaphore.h&中定义:
struct semaphore {
   atomic_
   wait_queue_head_
count:相当于信号量的值,大于0,资源空闲;等于0,资源忙,但没有进程等待这
个保护的资源;小于0,资源不可用,并至少有一个进程等待资源。
wait:存放等待队列链表的地址,当前等待资源的所有睡眠进程都会放在这个链表中。
sleepers:存放一个标志,表示是否有一些进程在信号量上睡眠。
2.内核信号量中的等待队列(删除,没有联系)
上面已经提到了内核信号量使用了等待队列wait_queue来实现阻塞操作。
当某任务由于没有某种条件没有得到满足时,它就被挂到等待队列中睡眠。当条件得到满足
时,该任务就被移出等待队列,此时并不意味着该任务就被马上执行,因为它又被移进工
作队列中等待CPU资源,在适当的时机被调度。
内核信号量是在内部使用等待队列的,也就是说该等待队列对用户是隐藏的,无须用
户干涉。由用户真正使用的等待队列我们将在另外的篇章进行详解。
3.内核信号量的相关函数
(1)初始化:
void sema_init (struct semaphore *sem, int val);
void init_MUTEX (struct semaphore *sem); //将sem的值置为1,表示资源空闲
void init_MUTEX_LOCKED (struct semaphore *sem); //将sem的值置为0,表示资源忙
(2)申请内核信号量所保护的资源:
void down(struct semaphore * sem); // 可引起睡眠
int down_interruptible(struct semaphore * sem); // down_interruptible能被信号打断
int down_trylock(struct semaphore * sem); // 非阻塞函数,不会睡眠。无法锁定资源则
(3)释放内核信号量所保护的资源:
void up(struct semaphore * sem);
4.内核信号量的使用例程
在驱动程序中,当多个线程同时访问相同的资源时(驱动中的全局变量时一种典型的
共享资源),可能会引发“竞态“,因此我们必须对共享资源进行并发控制。Linux内核中
解决并发控制的最常用方法是自旋锁与信号量(绝大多数时候作为互斥锁使用)。
ssize_t globalvar_write(struct file *filp, const char *buf, size_t len, loff_t *off)
 //获得信号量
 if (down_interruptible(&sem))
  return - ERESTARTSYS;
 //将用户空间的数据复制到内核空间的global_var
 if (copy_from_user(&global_var, buf, sizeof(int)))
  up(&sem);
  return - EFAULT;
 //释放信号量
 up(&sem);
 return sizeof(int);
四.POSIX 信号量与SYSTEM V信号量的比较
1. 对POSIX来说,信号量是个非负整数。常用于线程间同步。
而SYSTEM V信号量则是一个或多个信号量的集合,它对应的是一个信号量结构体,
这个结构体是为SYSTEM V IPC服务的,信号量只不过是它的一部分。常用于进程间同步。
2.POSIX信号量的引用头文件是“&semaphore.h&”,而SYSTEM V信号量的引用头文件是
“&sys/sem.h&”。
3.从使用的角度,System V信号量是复杂的,而Posix信号量是简单。比如,POSIX信
号量的创建和初始化或PV操作就很非常方便。
五.POSIX信号量详解
1.无名信号量
无名信号量的创建就像声明一般的变量一样简单,例如:sem_t sem_id。然后再初
始化该无名信号量,之后就可以放心使用了。
无名信号量常用于多线程间的同步,同时也用于相关进程间的同步。也就是说,无名信
号量必须是多个进程(线程)的共享变量,无名信号量要保护的变量也必须是多个进程
(线程)的共享变量,这两个条件是缺一不可的。
常见的无名信号量相关函数:sem_destroy
int sem_init(sem_t *sem, int pshared, unsigned int value);
1)pshared==0 用于同一多线程的同步;
2)若pshared&0 用于多个相关进程间的同步(即由fork产生的)
int sem_getvalue(sem_t *sem, int *sval);
取回信号量sem的当前值,把该值保存到sval中。
若有1个或更多的线程或进程调用sem_wait阻塞在该信号量上,该函数返回两种值:
2) 返回阻塞在该信号量上的进程或线程数目
linux采用返回的第一种策略。
sem_wait(或sem_trywait)相当于P操作,即申请资源。
int sem_wait(sem_t *sem); // 这是一个阻塞的函数
测试所指定信号量的值,它的操作是原子的。
若sem&0,那么它减1并立即返回。
若sem==0,则睡眠直到sem&0,此时立即减1,然后返回。
int sem_trywait(sem_t *sem); // 非阻塞的函数
其他的行为和sem_wait一样,除了:
若sem==0,不是睡眠,而是返回一个错误EAGAIN。
sem_post相当于V操作,释放资源。
int sem_post(sem_t *sem);
把指定的信号量sem的值加1;
呼醒正在等待该信号量的任意线程。
注意:在这些函数中,只有sem_post是信号安全的函数,它是可重入函数
(a)无名信号量在多线程间的同步
无名信号量的常见用法是将要保护的变量放在sem_wait和sem_post中间所形成的
临界区内,这样该变量就会被保护起来,例如:
#include &pthread.h&
#include &semaphore.h&
#include &sys/types.h&
#include &stdio.h&
#include &unistd.h&
// 被保护的全局变量
sem_t sem_
void* thread_one_fun(void *arg)
sem_wait(&sem_id);
printf(&thread_one have the semaphore\n&);
number++;
printf(&number = %d\n&,number);
sem_post(&sem_id);
void* thread_two_fun(void *arg)
sem_wait(&sem_id);
printf(&thread_two have the semaphore \n&);
printf(&number = %d\n&,number);
sem_post(&sem_id);
int main(int argc,char *argv[])
number = 1;
pthread_t id1, id2;
sem_init(&sem_id, 0, 1);
pthread_create(&id1,NULL,thread_one_fun, NULL);
pthread_create(&id2,NULL,thread_two_fun, NULL);
pthread_join(id1,NULL);
pthread_join(id2,NULL);
printf(&main,,,\n&);
上面的例程,到底哪个线程先申请到信号量资源,这是随机的。如果想要某个特定的顺
序的话,可以用2个信号量来实现。例如下面的例程是线程1先执行完,然后线程2才继
续执行,直至结束。
// 被保护的全局变量
sem_t sem_id1, sem_id2;
void* thread_one_fun(void *arg)
sem_wait(&sem_id1);
printf(&thread_one have the semaphore\n&);
number++;
printf(&number = %d\n&,number);
sem_post(&sem_id2);
void* thread_two_fun(void *arg)
sem_wait(&sem_id2);
printf(&thread_two have the semaphore \n&);
printf(&number = %d\n&,number);
sem_post(&sem_id1);
int main(int argc,char *argv[])
number = 1;
pthread_t id1, id2;
sem_init(&sem_id1, 0, 1); // 空闲的
sem_init(&sem_id2, 0, 0); // 忙的
pthread_create(&id1,NULL,thread_one_fun, NULL);
pthread_create(&id2,NULL,thread_two_fun, NULL);
pthread_join(id1,NULL);
pthread_join(id2,NULL);
printf(&main,,,\n&);
(b)无名信号量在相关进程间的同步
说是相关进程,是因为本程序中共有2个进程,其中一个是另外一个的子进程(由
产生)的。
本来对于fork来说,子进程只继承了父进程的代码副本,mutex理应在父子进程
中是相互独立的两个变量,但由于在初始化mutex的时候,由pshared = 1指
定了mutex处于共享内存区域,所以此时mutex变成了父子进程共享的一个变
量。此时,mutex就可以用来同步相关进程了。
#include &semaphore.h&
#include &stdio.h&
#include &errno.h&
#include &stdlib.h&
#include &unistd.h&
#include &sys/types.h&
#include &sys/stat.h&
#include &fcntl.h&
#include &sys/mman.h&
int main(int argc, char **argv)
int fd, i,count=0,nloop=10,zero=0,*
//open a file and map it into memory
fd = open(&log.txt&,O_RDWR|O_CREAT,S_IRWXU);
write(fd,&zero,sizeof(int));
ptr = mmap( NULL,sizeof(int),PROT_READ |
PROT_WRITE,MAP_SHARED,fd,0 );
close(fd);
/* create, initialize semaphore */
if( sem_init(&mutex,1,1) & 0) //
perror(&semaphore initilization&);
if (fork() == 0)
{ /* child process*/
for (i = 0; i & i++)
sem_wait(&mutex);
printf(&child: %d\n&, (*ptr)++);
sem_post(&mutex);
/* back to parent process */
for (i = 0; i & i++)
sem_wait(&mutex);
printf(&parent: %d\n&, (*ptr)++);
sem_post(&mutex);
2.有名信号量
有名信号量的特点是把信号量的值保存在文件中。
这决定了它的用途非常广:既可以用于线程,也可以用于相关进程间,甚至是不相关
(a)有名信号量能在进程间共享的原因
由于有名信号量的值是保存在文件中的,所以对于相关进程来说,子进程是继承了父
进程的文件描述符,那么子进程所继承的文件描述符所指向的文件是和父进程一样的,当
然文件里面保存的有名信号量值就共享了。
(b)有名信号量相关函数说明
有名信号量在使用的时候,和无名信号量共享sem_wait和sem_post函数。
区别是有名信号量使用sem_open代替sem_init,另外在结束的时候要像关闭文件
一样去关闭这个有名信号量。
(1)打开一个已存在的有名信号量,或创建并初始化一个有名信号量。一个单一的调用就完
成了信号量的创建、初始化和权限的设置。
sem_t *sem_open(const char *name, int oflag, mode_t mode , int value);
name是文件的路径名;
Oflag 有O_CREAT或O_CREAT|EXCL两个取值;
mode_t控制新的信号量的访问权限;
Value指定信号量的初始化值。
这里的name不能写成/tmp/aaa.sem这样的格式,因为在linux下,sem都是创建
在/dev/shm目录下。你可以将name写成“/mysem”或“mysem”,创建出来的文件都
是“/dev/shm/sem.mysem”,千万不要写路径。也千万不要写“/tmp/mysem”之类的。
当oflag = O_CREAT时,若name指定的信号量不存在时,则会创建一个,而且后
面的mode和value参数必须有效。若name指定的信号量已存在,则直接打开该信号量,
同时忽略mode和value参数。
当oflag = O_CREAT|O_EXCL时,若name指定的信号量已存在,该函数会直接返
(2) 一旦你使用了一信号量,销毁它们就变得很重要。
在做这个之前,要确定所有对这个有名信号量的引用都已经通过sem_close()函数
关闭了,然后只需在退出或是退出处理函数中调用sem_unlink()去删除系统中的信号量,
注意如果有任何的处理器或是线程引用这个信号量,sem_unlink()函数不会起到任何的作
也就是说,必须是最后一个使用该信号量的进程来执行sem_unlick才有效。因为每个
信号灯有一个引用计数器记录当前的打开次数,sem_unlink必须等待这个数为0时才能把
name所指的信号灯从文件系统中删除。也就是要等待最后一个sem_close发生。
(c)有名信号量在无相关进程间的同步
前面已经说过,有名信号量是位于共享内存区的,那么它要保护的资源也必须是位于
共享内存区,只有这样才能被无相关的进程所共享。
在下面这个例子中,服务进程和客户进程都使用shmget和shmat来获取得一块共享内
存资源。然后利用有名信号量来对这块共享内存资源进行互斥保护。
&u&File1: server.c &/u&
#include &sys/types.h&
#include &sys/ipc.h&
#include &sys/shm.h&
#include &stdio.h&
#include &semaphore.h&
#include &sys/types.h&
#include &sys/stat.h&
#include &fcntl.h&
#define SHMSZ 27
char SEM_NAME[]= &vik&;
int main()
char *shm,*s;
//name the shared memory segment
key = 1000;
//create & initialize semaphore
mutex = sem_open(SEM_NAME,O_CREAT,0644,1);
if(mutex == SEM_FAILED)
perror(&unable to create semaphore&);
sem_unlink(SEM_NAME);
//create the shared memory segment with this key
shmid = shmget(key,SHMSZ,IPC_CREAT|0666);
if(shmid&0)
perror(&failure in shmget&);
//attach this segment to virtual memory
shm = shmat(shmid,NULL,0);
//start writing into memory
for(ch='A';ch&='Z';ch++)
sem_wait(mutex);
*s++ =
sem_post(mutex);
//the below loop could be replaced by binary semaphore
while(*shm != '*')
sem_close(mutex);
sem_unlink(SEM_NAME);
shmctl(shmid, IPC_RMID, 0);
&u&File 2: client.c&/u&
#include &sys/types.h&
#include &sys/ipc.h&
#include &sys/shm.h&
#include &stdio.h&
#include &semaphore.h&
#include &sys/types.h&
#include &sys/stat.h&
#include &fcntl.h&
#define SHMSZ 27
char SEM_NAME[]= &vik&;
int main()
char *shm,*s;
//name the shared memory segment
key = 1000;
//create & initialize existing semaphore
mutex = sem_open(SEM_NAME,0,0644,0);
if(mutex == SEM_FAILED)
perror(&reader:unable to execute semaphore&);
sem_close(mutex);
//create the shared memory segment with this key
shmid = shmget(key,SHMSZ,0666);
if(shmid&0)
perror(&reader:failure in shmget&);
//attach this segment to virtual memory
shm = shmat(shmid,NULL,0);
//start reading
for(s=*s!=NULL;s++)
sem_wait(mutex);
putchar(*s);
sem_post(mutex);
//once done signal exiting of reader:This can be replaced by
another semaphore
*shm = '*';
sem_close(mutex);
shmctl(shmid, IPC_RMID, 0);
六.SYSTEM V信号量
这是信号量值的集合,而不是单个信号量。相关的信号量操作函数由&sys/ipc.h&引用。
1.信号量结构体
内核为每个信号量集维护一个信号量结构体,可在&sys/sem.h&找到该定义:
struct semid_ds {
struct ipc_perm sem_ /* 信号量集的操作许可权限 */
struct sem *sem_ /* 某个信号量sem结构数组的指针,当前信号量集
中的每个信号量对应其中一个数组元素 */
ushort sem_ /* sem_base 数组的个数 */
time_t sem_ /* 最后一次成功修改信号量数组的时间 */
time_t sem_ /* 成功创建时间 */
struct sem {
/* 信号量的当前值 */
/* 最后一次返回该信号量的进程ID 号 */
/* 等待semval大于当前值的进程个数 */
/* 等待semval变成0的进程个数 */
2.常见的SYSTEM V信号量函数
(a)关键字和描述符
SYSTEM V信号量是SYSTEM V IPC(即SYSTEM V进程间通信)的组成部分,其他
的有SYSTEM V消息队列,SYSTEM V共享内存。而关键字和IPC描述符无疑是它们的共
同点,也使用它们,就不得不先对它们进行熟悉。这里只对SYSTEM V信号量进行讨论。
IPC描述符相当于引用ID号,要想使用SYSTEM V信号量(或MSG、SHM),就必须
用IPC描述符来调用信号量。而IPC描述符是内核动态提供的(通过semget来获取),用
户无法让服务器和客户事先认可共同使用哪个描述符,所以有时候就需要到关键字KEY来
定位描述符。
某个KEY只会固定对应一个描述符(这项转换工作由内核完成),这样假如服务器和
客户事先认可共同使用某个KEY,那么大家就都能定位到同一个描述符,也就能定位到同
一个信号量,这样就达到了SYSTEM V信号量在进程间共享的目的。
(b)创建和打开信号量
int semget(key_t key, int nsems, int oflag)
(1) nsems&0 : 创建一个信的信号量集,指定集合中信号量的数量,一旦创建就不能更改。
(2) nsems==0 : 访问一个已存在的集合
(3) 返回的是一个称为信号量标识符的整数,semop和semctl函数将使用它。
(4) 创建成功后信号量结构被设置:
.sem_perm 的uid和gid成员被设置成的调用进程的有效用户ID和有效组ID
.oflag 参数中的读写权限位存入sem_perm.mode
.sem_otime 被置为0,sem_ctime被设置为当前时间
.sem_nsems 被置为nsems参数的值
该集合中的每个信号量不初始化,这些结构是在semctl,用参数SET_VAL,SETALL
初始化的。
semget函数执行成功后,就产生了一个由内核维持的类型为semid_ds结构体的信号量
集,返回semid就是指向该信号量集的引索。
(c)关键字的获取
有多种方法使客户机和服务器在同一IPC结构上会合:
(1) 服务器可以指定关键字IPC_PRIVATE创建一个新IPC结构,将返回的标识符存放在某
处(例如一个文件)以便客户机取用。关键字 IPC_PRIVATE保证服务器创建一个新IPC结
构。这种技术的缺点是:服务器要将整型标识符写到文件中,然后客户机在此后又要读文件
取得此标识符。
IPC_PRIVATE关键字也可用于父、子关系进程。父进程指定 IPC_PRIVATE创建一个新
IPC结构,所返回的标识符在fork后可由子进程使用。子进程可将此标识符作为exec函数
的一个参数传给一个新程序。
(2) 在一个公用头文件中定义一个客户机和服务器都认可的关键字。然后服务器指定此关键
字创建一个新的IPC结构。这种方法的问题是该关键字可能已与一个 IPC结构相结合,在
此情况下,get函数(msgget、semget或shmget)出错返回。服务器必须处理这一错误,删除
已存在的IPC结构,然后试着再创建它。当然,这个关键字不能被别的程序所占用。
(3) 客户机和服务器认同一个路径名和课题I D(课题I D是0 ~ 2 5 5之间的字符值) ,然
后调用函数ftok将这两个值变换为一个关键字。这样就避免了使用一个已被占用的关键字的
使用ftok并非高枕无忧。有这样一种例外:服务器使用ftok获取得一个关键字后,该文
件就被删除了,然后重建。此时客户端以此重建后的文件来ftok所获取的关键字就和服务器
的关键字不一样了。所以一般商用的软件都不怎么用ftok。
一般来说,客户机和服务器至少共享一个头文件,所以一个比较简单的方法是避免使
用ftok,而只是在该头文件中存放一个大家都知道的关键字。
(d)设置信号量的值(PV操作)
int semop(int semid, struct sembuf *opsptr, size_t nops);
(1) semid: 是semget返回的semid
(2)opsptr: 指向信号量操作结构数组
(3) nops : opsptr所指向的数组中的sembuf结构体的个数
struct sembuf {
short sem_ // 要操作的信号量在信号量集里的编号,
short sem_ // 信号量操作
short sem_ // 操作表示符
(4) 若sem_op 是正数,其值就加到semval上,即释放信号量控制的资源
若sem_op 是0,那么调用者希望等到semval变为0,如果semval是0就返回;
若sem_op 是负数,那么调用者希望等待semval变为大于或等于sem_op的绝对值
例如,当前semval为2,而sem_op = -3,那么怎么办?
注意:semval是指semid_ds中的信号量集中的某个信号量的值
(5) sem_flg
SEM_UNDO 由进程自动释放信号量
IPC_NOWAIT 不阻塞
到这里,读者肯定有个疑惑:semop希望改变的semval到底在哪里?我们怎么没看到
有它的痕迹?其实,前面已经说明了,当使用semget时,就产生了一个由内核维护的信号
量集(当然每个信号量值即semval也是只由内核才能看得到了),用户能看到的就是返回
的semid。内核通过semop 函数的参数,知道应该去改变semid 所指向的信号量的哪个
(e)对信号集实行控制操作(semval的赋值等)
int semctl(int semid, int semum, int cmd, ../* union semun arg */);
semid是信号量集合;
semnum是信号在集合中的序号;
semum是一个必须由用户自定义的结构体,在这里我们务必弄清楚该结构体的组成:
union semun
// cmd == SETVAL
struct semid_ds *buf // cmd == IPC_SET或者 cmd == IPC_STAT
ushort * // cmd == SETALL,或 cmd = GETALL
val只有cmd ==SETVAL时才有用,此时指定的semval = arg.val。
注意:当cmd == GETVAL时,semctl函数返回的值就是我们想要的semval。千万不要
以为指定的semval被返回到arg.val中。
array指向一个数组,当cmd==SETALL时,就根据arg.array来将信号量集的所有值都
赋值;当cmd ==GETALL时,就将信号量集的所有值返回到arg.array指定的数组中。
buf 指针只在cmd==IPC_STAT 或IPC_SET 时有用,作用是semid 所指向的信号量集
(semid_ds机构体)。一般情况下不常用,这里不做谈论。
另外,cmd == IPC_RMID还是比较有用的。
#include &sys/types.h&
#include &sys/ipc.h&
#include &sys/sem.h&
#include &stdio.h&
int errno=0;
union semun {
struct semid_ds *
unsigned short *
int main()
struct sembuf sops[2]; //要用到两个信号量,所以要定义两个操作数组
unsigned short argarray[80];
arg.array =
semid = semget(IPC_PRIVATE, 2, 0666);
if(semid & 0 )
printf(&semget failed. errno: %d\n&, errno);
//获取0th信号量的原始值
rslt = semctl(semid, 0, GETVAL);
printf(&val = %d\n&,rslt);
//初始化0th信号量,然后再读取,检查初始化有没有成功
arg.val = 1; // 同一时间只允许一个占有者
semctl(semid, 0, SETVAL, arg);
rslt = semctl(semid, 0, GETVAL);
printf(&val = %d\n&,rslt);
sops[0].sem_num = 0;
sops[0].sem_op = -1;
sops[0].sem_flg = 0;
sops[1].sem_num = 1;
sops[1].sem_op = 1;
sops[1].sem_flg = 0;
rslt=semop(semid, sops, 1); //申请0th信号量,尝试锁定
if (rslt & 0 )
printf(&semop failed. errno: %d\n&, errno);
//可以在这里对资源进行锁定
sops[0].sem_op = 1;
semop(semid, sops, 1); //释放0th信号量
rslt = semctl(semid, 0, GETVAL);
printf(&val = %d\n&,rslt);
rslt=semctl(semid, 0, GETALL, arg);
if (rslt & 0)
printf(&semctl failed. errno: %d\n&, errno);
printf(&val1:%d val2: %d\n&,(unsigned int)argarray[0],(unsigned int)argarray[1]);
if(semctl(semid, 1, IPC_RMID) == -1)
Perror(“semctl failure while clearing reason”);
return(0);
七.信号量的牛刀小试——生产者与消费者问题
1.问题描述:
有一个长度为N的缓冲池为生产者和消费者所共有,只要缓冲池未满,生产者便可将
消息送入缓冲池;只要缓冲池未空,消费者便可从缓冲池中取走一个消息。生产者往缓冲池
放信息的时候,消费者不可操作缓冲池,反之亦然。
2.使用多线程和信号量解决该经典问题的互斥
#include &pthread.h&
#include &stdio.h&
#include &semaphore.h&
#define BUFF_SIZE 10
char buffer[BUFF_SIZE];
// 缓冲池里的信息数目
sem_t sem_ // 生产者和消费者的互斥锁
sem_t p_sem_ // 空的时候,对消费者不可进
sem_t c_sem_ // 满的时候,对生产者不可进
void * Producer()
sem_wait(&p_sem_mutex); //当缓冲池未满时
sem_wait(&sem_mutex); //等待缓冲池空闲
count++;
sem_post(&sem_mutex);
if(count & BUFF_SIZE)//缓冲池未满
sem_post(&p_sem_mutex);
if(count & 0) //缓冲池不为空
sem_post(&c_sem_mutex);
void * Consumer()
sem_wait(&c_sem_mutex);//缓冲池未空时
sem_wait(&sem_mutex); //等待缓冲池空闲
sem_post(&sem_mutex);
if(count & 0)
sem_post(c_sem_nutex);
int main()
pthread_t ptid,
//initialize the semaphores
sem_init(&empty_sem_mutex,0,1);
sem_init(&full_sem_mutex,0,0);
//creating producer and consumer threads
if(pthread_create(&ptid, NULL,Producer, NULL))
printf(&\n ERROR creating thread 1&);
if(pthread_create(&ctid, NULL,Consumer, NULL))
printf(&\n ERROR creating thread 2&);
if(pthread_join(ptid, NULL)) /* wait for the producer to finish */
printf(&\n ERROR joining thread&);
if(pthread_join(ctid, NULL)) /* wait for consumer to finish */
printf(&\n ERROR joining thread&);
sem_destroy(&empty_sem_mutex);
sem_destroy(&full_sem_mutex);
//exit the main thread
pthread_exit(NULL);
最近看《UNIX环境高级编程》多线程同步,看到他举例说条件变量pthread_cond_t怎么用,愣是没有看懂,只好在网上找了份代码,跑了跑,才弄明白
[X61@horizon threads]$ gcc thread_cond.c -lpthread -o tcd
以下是程序运行结果:
[X61@horizon threads]$ ./tcd&
thread1: lock 30
thread1: unlock 40
thread2: lock 52
thread2: wait 1& 55
thread1: lock 30
thread1: unlock 40
thread1: lock 30
thread1:signal 1& 33
thread1:signal 2& 35
thread1: unlock 40
thread2: wait 2& 57
thread2: unlock 61
thread1: lock 30
thread1: unlock 40
thread2: lock 52
thread2: wait 1& 55
thread1: lock 30
thread1: unlock 40
thread1: lock 30
thread1:signal 1& 33
thread1:signal 2& 35
thread1: unlock 40
thread2: wait 2& 57
thread2: unlock 61
这里的两个关键函数就在pthread_cond_wait和pthread_cond_signal函数。
线程一先执行,获得mutex锁,打印,然后释放mutex锁,然后阻塞自己1秒。
线程二此时和线程一应该是并发的执行&,这里是一个要点,为什么说是线程此时是并发的执行,因为此时不做任何干涉的话,是没有办法确定是线程一先获得执行还是线程二先获得执行,到底那个线程先获得执行,取决于操作系统的调度,想刻意的让线程2先执行,可以让线程2一出来,先sleep一秒。
这里并发执行的情况是,线程一先进入循环,然后获得锁,此时估计线程二执行,阻塞在
pthread_mutex_lock(&mutex);
这行语句中,直到线程1释放mutex锁
pthread_mutex_unlock(&mutex);/*解锁互斥量*/
然后线程二得已执行,获取metux锁,满足if条件,到pthread_cond_wait&(&cond,&mutex);/*等待*/
这里的线程二阻塞,不仅仅是等待cond变量发生改变,同时释放mutex锁&,因为当时看书没有注意,所以这里卡了很久。
mutex锁释放后,线程1终于获得了mutex锁,得已继续运行,当线程1的if(i%3==0)的条件满足后,通过pthread_cond_signal发送信号,告诉等待cond的变量的线程(这个情景中是线程二),cond条件变量已经发生了改变。
不过此时线程二并没有立即得到运行&,因为线程二还在等待mutex锁的释放,所以线程一继续往下走,直到线程一释放mutex锁,线程二才能停止等待,打印语句,然后往下走通过pthread_mutex_unlock(&mutex)释放mutex锁,进入下一个循环。
最近一直在想怎么高效率的在IO线程接收到数据时通知逻辑线程(基于线程池)工作的问题,像网络编程的服务器模型的一些模型都需要用到这个实现,下面我这里简单的罗列一个多线程的网络服务器模型
半同步/半异步(half-sync/half-async):&
许多餐厅使用&半同步/半异步&模式的变体。例如,餐厅常常雇佣一个领班负责迎接顾客,并在餐厅繁忙时留意给顾客安排桌位,为等待就餐的顾客按序排队是必要的。领班由所有顾客“共享”,不能被任何特定顾客占用太多时间。当顾客在一张桌子入坐后,有一个侍应生专门为这张桌子服务。&
对于上面罗列的这种模型,本文讨论的问题是当领班接到客人时,如何高效率的通知侍应生去服务顾客.&
在我们使用很广泛的线程池实现中,也会有一样的问题
方法实现:
1.使用锁+轮询
使用这种方法可以很简单的实现,但是会有一定的性能消耗,其还有一个点要好好把握,就是一次轮询没有结果后相隔多久进行下一次的轮询,间隔时间太短,消耗的CPU资源较多,间隔时间太长,不能很及时的响应请求。这就相当于上面的这个例子,侍应生时不时的取询问领班有没有顾客到来
2.使用条件变量的线程同步
线程条件变量pthread_cond_t
线程等待某个条件
int pthread_cond_timedwait(pthread_cond_t *restrict&cond,pthread_mutex_t *restrict&mutex,const
struct timespec *restrictabstime);
int pthread_cond_wait(pthread_cond_t *restrict&cond,pthread_mutex_t *restrict&mutex);&
通知所有的线程
int pthread_cond_broadcast(pthread_cond_t *cond);&
只通知一个线程
int pthread_cond_signal(pthread_cond_t *cond);&
正确的使用方法
&  pthread_cond_wait用法:
pthread_mutex_lock(&mutex);
while(condition_is_false)
&pthread_cond_wait(&cond,&mutex);
condition_is_false=& //此操作是带锁的,也就是说只有一个线程同时进入这块
pthread_mutex_unlock(&mutex);
pthread_cond_signal用法:&
pthread_mutex_lock(&mutex);
condition_is_false=
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
我刚初用的时候,觉得非常的奇怪,为什么要这样用,加了mutex后还需要一个condition_is_false变量来表示有没有活干。其实这样子的一个操作主要是为了解决“假激活”问题,因为我么您这里的使用场景,只需要激活一个线程,因为一个线程干一个活,而不是多个线程干一个活,所以为了避免线程被激活了,但实际又没有事情干,所以使用了这么一套机制。
实际上,信号和pthread_cond_broadcast是两个常见的导致假唤醒的情况。假如条件变量上有多个线程在等待,pthread_cond_broadcast会唤醒所有的等待线程,而pthread_cond_signal只会唤醒其中一个等待线程。这样,pthread_cond_broadcast的情况也许要在pthread_cond_wait前使用while循环来检查条件变量。&
来个例子:
事实上上面的例子无论是使用pthread_cond_signal还是pthread_cond_broadcast,都只会打印Thread&awake,&finish&work5次,大家可能会觉得非常奇怪,但是实际情况就是这样的。&为了明白其pthread_cont_wait内部干了什么工作,有必要深入一下其内部实现。
关于其内部实现伪代码如下:
1 pthread_cond_wait(mutex, cond):
value = cond-& /* 1 */
pthread_mutex_unlock(mutex); /* 2 */
pthread_mutex_lock(cond-&mutex); /* 10 */
pthread_cond_t自带一个mutex来互斥对waiter等待链表的操作
if (value == cond-&value) { /* 11 */
检查一次是不是cond有被其他线程设置过,相当于单例模式的第二次检测是否为NULL
me-&next_cond = cond-&
cond-&waiter =//链表操作
pthread_mutex_unlock(cond-&mutex);
unable_to_run(me);
pthread_mutex_unlock(cond-&mutex); /* 12 */
pthread_mutex_lock(mutex); /* 13 */
14 pthread_cond_signal(cond):
pthread_mutex_lock(cond-&mutex); /* 3 */
cond-&value++; /* 4 */
if (cond-&waiter) { /* 5 */
sleeper = cond-& /* 6 */
cond-&waiter = sleeper-&next_ /* 7 */
//链表操作
able_to_run(sleeper); /* 8 */
运行sleep的线程,即上面的me
pthread_mutex_unlock(cond-&mutex); /* 9 */
pthread_cond_broadcast虽然能够激活所有的线程,但是激活之后会有mutex锁,也就是说他的激活是顺序进行的,只有第一个激活的线程调用pthread_mutex_unlock(&mutex)后,后一个等待的线程才会继续运行.因为从pthread_cond_wait(&cond,&mutex)到pthread_mutex_unlock(&mutex)区间是加的独占锁,从wait激活后的第一个线程占用了这个锁,所以其他的线程不能运行,只能等待。所以当第一个被激活的线程修改了condition_is_false后(上面测试代码的workToDo),接着调用pthread_mutex_unlock(&mutex)后,此时其他等待在cond的一个线程会激活,但是此时condition_is_false已经被设置,所以他跑不出while循环,当调用pthread_cond_wait时,其内部pthread_mutex_unlock(mutex)调用会导致另一个在它后面的等待在cond的线程被激活。  
所以,通过这种方式,即便是误调用了pthread_cond_broadcast或者由于信号中断的原因激活了所有在等待条件的线程,也能保证其结果是正确的。
另外说一句题外话,很多人写的基于条件变量线程同步的框架,说自己是无锁的,其实这是不对的,只是内部锁的机制在pthread_cond_wait实现了而已,其还是基于互斥锁的实现。真正想要达到无锁的可以关注一下lockfree相关的CAS算法,其内部使用一个intel
CPU的cmpxchg8指令完成的,其实这种实现个人认为和传统锁相比只是一个非阻塞锁和阻塞锁的区别。
版权所有 爱编程 (C) Copyright 2012. . All Rights Reserved.
闽ICP备号-3
微信扫一扫关注爱编程,每天为您推送一篇经典技术文章。

我要回帖

更多关于 多线程共享内存 的文章

 

随机推荐